diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index d1ab06f852c8..ef070057a3da 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -73,7 +73,13 @@ statement | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields - | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch + | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch + | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag + ; + +createReplaceTagClause + : (CREATE OR)? REPLACE TAG identifier tagOptions + | CREATE TAG (IF NOT EXISTS)? identifier tagOptions ; createReplaceBranchClause @@ -81,8 +87,13 @@ createReplaceBranchClause | CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions ; +tagOptions + : (AS OF VERSION snapshotId)? (refRetain)? + ; + branchOptions - : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?; + : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)? + ; snapshotRetention : WITH SNAPSHOT RETENTION minSnapshotsToKeep @@ -197,7 +208,7 @@ fieldList nonReserved : ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS - | TRUE | FALSE + | TAG | TRUE | FALSE | MAP ; @@ -251,6 +262,7 @@ SET: 'SET'; SNAPSHOT: 'SNAPSHOT'; SNAPSHOTS: 'SNAPSHOTS'; TABLE: 'TABLE'; +TAG: 'TAG'; UNORDERED: 'UNORDERED'; VERSION: 'VERSION'; WITH: 'WITH'; diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 8d3250a7de5b..946c10d193bf 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -210,7 +210,10 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI } private def isSnapshotRefDdl(normalized: String): Boolean = { - normalized.contains("create branch") || normalized.contains("replace branch") + normalized.contains("create branch") || + normalized.contains("replace branch") || + normalized.contains("create tag") || + normalized.contains("replace tag") } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 11c60b610cac..b02897aa7664 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.BranchOptions import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch +import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -49,6 +50,7 @@ import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.TagOptions import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.expressions @@ -130,6 +132,35 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS ifNotExists) } + /** + * Create an CREATE OR REPLACE TAG logical command. + */ + override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTag = withOrigin(ctx) { + val createTagClause = ctx.createReplaceTagClause() + + val tagName = createTagClause.identifier().getText + + val tagOptionsContext = Option(createTagClause.tagOptions()) + val snapshotId = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())) + .map(_.getText.toLong) + val tagRetain = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.refRetain())) + val tagRefAgeMs = tagRetain.map(retain => + TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong)) + val tagOptions = TagOptions( + snapshotId, + tagRefAgeMs + ) + + val replace = createTagClause.REPLACE() != null + val ifNotExists = createTagClause.EXISTS() != null + + CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier), + tagName, + tagOptions, + replace, + ifNotExists) + } + /** * Create an REPLACE PARTITION FIELD logical command. */ diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala new file mode 100644 index 000000000000..e48f7d8ed04c --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class CreateOrReplaceTag( + table: Seq[String], + tag: String, + tagOptions: TagOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"CreateOrReplaceTag tag: ${tag} for table: ${table.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala new file mode 100644 index 000000000000..85e3b95f4aba --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +case class TagOptions(snapshotId: Option[Long], snapshotRefRetain: Option[Long]) diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala new file mode 100644 index 000000000000..d41f9f03ff4c --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.TagOptions +import org.apache.spark.sql.connector.catalog._ + +case class CreateOrReplaceTagExec( + catalog: TableCatalog, + ident: Identifier, + tag: String, + tagOptions: TagOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val snapshotId = tagOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId()) + val manageSnapshot = iceberg.table.manageSnapshots() + if (!replace) { + val ref = iceberg.table().refs().get(tag); + if (ref != null && ifNotExists) { + return Nil + } + + manageSnapshot.createTag(tag, snapshotId) + } else { + manageSnapshot.replaceTag(tag, snapshotId) + } + + if (tagOptions.snapshotRefRetain.nonEmpty) { + manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get) + } + + manageSnapshot.commit() + + case table => + throw new UnsupportedOperationException(s"Cannot create tag to non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"Create tag: ${tag} for table: ${ident.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 7e343534dede..6c7ededf88bb 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.Call import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch +import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField @@ -66,6 +67,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil + case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) => + CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil + case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java new file mode 100644 index 000000000000..def040c15e59 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestTagDDL extends SparkExtensionsTestBase { + private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"}; + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + } + }; + } + + public TestTagDDL(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Before + public void before() { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateTagWithRetain() throws NoSuchTableException { + Table table = insertRows(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + long maxRefAge = 10L; + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + for (String timeUnit : TIME_UNITS) { + String tagName = "t1" + timeUnit; + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(firstSnapshotId, ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); + } + + String tagName = "t1"; + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN", + tableName, tagName, firstSnapshotId, maxRefAge)); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, tagName, "abc")); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d SECONDS", + tableName, tagName, firstSnapshotId, maxRefAge)); + } + + @Test + public void testCreateTagUseDefaultConfig() throws NoSuchTableException { + Table table = insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + + AssertHelpers.assertThrows( + "unknown snapshot", + ValidationException.class, + "unknown snapshot: -1", + () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, -1)); + + sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(snapshotId, ref.snapshotId()); + Assert.assertNull(ref.maxRefAgeMs()); + + AssertHelpers.assertThrows( + "Cannot create an exist tag", + IllegalArgumentException.class, + "already exists", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName)); + + AssertHelpers.assertThrows( + "Non-conforming tag name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123")); + + table.manageSnapshots().removeTag(tagName).commit(); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + snapshotId = table.currentSnapshot().snapshotId(); + sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); + table.refresh(); + ref = table.refs().get(tagName); + Assert.assertEquals(snapshotId, ref.snapshotId()); + Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateTagIfNotExists() throws NoSuchTableException { + long maxSnapshotAge = 2L; + Table table = insertRows(); + String tagName = "t1"; + sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName, maxSnapshotAge); + sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxRefAgeMs().longValue()); + } + + @Test + public void testReplaceTagFailsForBranch() throws NoSuchTableException { + String branchName = "branch1"; + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branchName, first).commit(); + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + AssertHelpers.assertThrows( + "Cannot perform replace tag on branches", + IllegalArgumentException.class, + "Ref branch1 is a branch not a tag", + () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName, second)); + } + + @Test + public void testReplaceTag() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + long expectedMaxRefAgeMs = 1000; + table + .manageSnapshots() + .createTag(tagName, first) + .setMaxRefAgeMs(tagName, expectedMaxRefAgeMs) + .commit(); + + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(second, ref.snapshotId()); + Assert.assertEquals(expectedMaxRefAgeMs, ref.maxRefAgeMs().longValue()); + } + + @Test + public void testReplaceTagDoesNotExist() throws NoSuchTableException { + Table table = insertRows(); + + AssertHelpers.assertThrows( + "Cannot perform replace tag on tag which does not exist", + IllegalArgumentException.class, + "Tag does not exist", + () -> + sql( + "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", + tableName, "someTag", table.currentSnapshot().snapshotId())); + } + + @Test + public void testReplaceTagWithRetain() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, first).commit(); + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + long maxRefAge = 10; + for (String timeUnit : TIME_UNITS) { + sql( + "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, second, maxRefAge, timeUnit); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(second, ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + } + } + + @Test + public void testCreateOrReplace() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + insertRows(); + long second = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag(tagName, second).commit(); + + sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(first, ref.snapshotId()); + } + + private Table insertRows() throws NoSuchTableException { + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + return validationCatalog.loadTable(tableIdent); + } +}