From 41b4e04c924cf6daf77e3f2b647812f6eee26bc1 Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Sat, 21 Jan 2023 00:04:59 +0800 Subject: [PATCH 1/5] Spark: Spark SQL Extensions for create tag Co-authored-by: Amogh Jahagirdar Co-authored-by: chidayong <247070443@qq.com> --- .../IcebergSqlExtensions.g4 | 13 +- .../IcebergSparkSqlExtensionsParser.scala | 5 +- .../IcebergSqlExtensionsAstBuilder.scala | 30 +++ .../catalyst/plans/logical/CreateTag.scala | 33 ++++ .../catalyst/plans/logical/TagOptions.scala | 22 +++ .../datasources/v2/CreateTagExec.scala | 66 +++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 4 + .../spark/extensions/TestCreateTag.java | 187 ++++++++++++++++++ 8 files changed, 357 insertions(+), 3 deletions(-) create mode 100644 spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala create mode 100644 spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala create mode 100644 spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala create mode 100644 spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index d1ab06f852c8..1aaf8b0199cd 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -73,7 +73,12 @@ statement | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields - | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch + | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch + | ALTER TABLE multipartIdentifier createTagClause #createTag + ; + +createTagClause + : CREATE TAG (IF NOT EXISTS)? identifier tagOptions ; createReplaceBranchClause @@ -81,6 +86,9 @@ createReplaceBranchClause | CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions ; +tagOptions + : (AS OF VERSION snapshotId)? (refRetain)?; + branchOptions : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?; @@ -197,7 +205,7 @@ fieldList nonReserved : ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS - | TRUE | FALSE + | TAG | TRUE | FALSE | MAP ; @@ -251,6 +259,7 @@ SET: 'SET'; SNAPSHOT: 'SNAPSHOT'; SNAPSHOTS: 'SNAPSHOTS'; TABLE: 'TABLE'; +TAG: 'TAG'; UNORDERED: 'UNORDERED'; VERSION: 'VERSION'; WITH: 'WITH'; diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 8d3250a7de5b..946c10d193bf 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -210,7 +210,10 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI } private def isSnapshotRefDdl(normalized: String): Boolean = { - normalized.contains("create branch") || normalized.contains("replace branch") + normalized.contains("create branch") || + normalized.contains("replace branch") || + normalized.contains("create tag") || + normalized.contains("replace tag") } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 11c60b610cac..38c6c4d7ac27 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.BranchOptions import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch +import org.apache.spark.sql.catalyst.plans.logical.CreateTag import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -49,6 +50,7 @@ import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.TagOptions import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.expressions @@ -128,6 +130,34 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS branchOptions, replace, ifNotExists) + + } + + /** + * Create an CREATE TAG logical command. + */ + override def visitCreateTag(ctx: CreateTagContext): CreateTag = withOrigin(ctx) { + val createTagClause = ctx.createTagClause() + + val tagName = createTagClause.identifier().getText + + val tagOptionsContext = Option(createTagClause.tagOptions()) + val snapshotId = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())) + .map(_.getText.toLong) + val tagRetain = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.refRetain())) + val tagRefAgeMs = tagRetain.map(retain => + TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong)) + val tagOptions = TagOptions( + snapshotId, + tagRefAgeMs + ) + + val ifNotExists = createTagClause.EXISTS() != null + + CreateTag(typedVisit[Seq[String]](ctx.multipartIdentifier), + tagName, + tagOptions, + ifNotExists) } /** diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala new file mode 100644 index 000000000000..84df5ac650f8 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class CreateTag(table: Seq[String], tag: String, tagOptions: TagOptions,ifNotExists: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"Create tag: ${tag} for table: ${table.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala new file mode 100644 index 000000000000..85e3b95f4aba --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +case class TagOptions(snapshotId: Option[Long], snapshotRefRetain: Option[Long]) diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala new file mode 100644 index 000000000000..877a0c5f44a7 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.CreateTag +import org.apache.spark.sql.catalyst.plans.logical.TagOptions +import org.apache.spark.sql.connector.catalog._ + +case class CreateTagExec(catalog: TableCatalog, + ident: Identifier, + tag: String, + tagOptions: TagOptions, + ifNotExists: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val ref = iceberg.table().refs().get(tag); + if (ref != null && ifNotExists) { + return Nil + } + + val snapshotId = tagOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId()) + val manageSnapshot = iceberg.table.manageSnapshots() + .createTag(tag, snapshotId) + if(tagOptions.snapshotRefRetain.nonEmpty) { + manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get) + } + + manageSnapshot.commit() + + case table => + throw new UnsupportedOperationException(s"Cannot create tag to non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"Create tag: ${tag} for table: ${ident.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 7e343534dede..d19201ac64ba 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.Call import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch +import org.apache.spark.sql.catalyst.plans.logical.CreateTag import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField @@ -66,6 +67,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil + case CreateTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, ifNotExists) => + CreateTagExec(catalog, ident, tag, tagOptions, ifNotExists) :: Nil + case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java new file mode 100644 index 000000000000..87cd2c204e55 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateTag extends SparkExtensionsTestBase { + private final String timeUnit; + + @Parameterized.Parameters( + name = "catalogName = {0}, implementation = {1}, config = {2}, timeUnit = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + "days" + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + "hours" + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + "minutes" + } + }; + } + + public TestCreateTag( + String catalogName, String implementation, Map config, String timeUnit) { + super(catalogName, implementation, config); + this.timeUnit = timeUnit; + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateTag() throws NoSuchTableException { + Table table = createDefaultTableAndInsert2Row(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + String tagName = "t1"; + long maxRefAge = 10L; + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(firstSnapshotId, ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN", + tableName, tagName, firstSnapshotId, maxRefAge)); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d SECONDS", + tableName, tagName, firstSnapshotId, maxRefAge)); + } + + @Test + public void testCreateTagUseDefaultConfig() throws NoSuchTableException { + Assume.assumeFalse(!timeUnit.equalsIgnoreCase("days")); + + Table table = createDefaultTableAndInsert2Row(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + + AssertHelpers.assertThrows( + "unknown snapshot", + ValidationException.class, + "unknown snapshot: -1", + () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, -1)); + + sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(snapshotId, ref.snapshotId()); + Assert.assertNull(ref.maxRefAgeMs()); + + AssertHelpers.assertThrows( + "Cannot create an exist tag", + IllegalArgumentException.class, + "already exists", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName)); + + table.manageSnapshots().removeTag(tagName).commit(); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + snapshotId = table.currentSnapshot().snapshotId(); + sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); + table.refresh(); + ref = table.refs().get(tagName); + Assert.assertEquals(snapshotId, ref.snapshotId()); + Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateTagIfNotExists() throws NoSuchTableException { + long maxSnapshotAge = 2L; + Table table = createDefaultTableAndInsert2Row(); + String tagName = "t1"; + sql("ALTER TABLE %s CREATE TAG %s RETAIN %d %s", tableName, tagName, maxSnapshotAge, timeUnit); + sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxSnapshotAge), + ref.maxRefAgeMs().longValue()); + } + + private Table createDefaultTableAndInsert2Row() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + Table table = validationCatalog.loadTable(tableIdent); + return table; + } +} From 72c1ee64375a9748800d88e55db877199ae0d8a7 Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Fri, 3 Feb 2023 13:34:32 +0800 Subject: [PATCH 2/5] add replace --- .../IcebergSqlExtensions.g4 | 7 +- .../IcebergSqlExtensionsAstBuilder.scala | 13 +- ...eateTag.scala => CreateOrReplaceTag.scala} | 9 +- ...xec.scala => CreateOrReplaceTagExec.scala} | 32 +-- .../v2/ExtendedDataSourceV2Strategy.scala | 6 +- .../spark/extensions/TestCreateTag.java | 60 ++---- .../spark/extensions/TestReplaceTag.java | 186 ++++++++++++++++++ 7 files changed, 248 insertions(+), 65 deletions(-) rename spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/{CreateTag.scala => CreateOrReplaceTag.scala} (82%) rename spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{CreateTagExec.scala => CreateOrReplaceTagExec.scala} (78%) create mode 100644 spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index 1aaf8b0199cd..ba62c7321069 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -74,11 +74,12 @@ statement | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch - | ALTER TABLE multipartIdentifier createTagClause #createTag + | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag ; -createTagClause - : CREATE TAG (IF NOT EXISTS)? identifier tagOptions +createReplaceTagClause + : (CREATE OR)? REPLACE TAG identifier tagOptions + | CREATE TAG (IF NOT EXISTS)? identifier tagOptions ; createReplaceBranchClause diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 38c6c4d7ac27..48a6862f1be8 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.BranchOptions import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch -import org.apache.spark.sql.catalyst.plans.logical.CreateTag +import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -133,11 +133,12 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS } + /** - * Create an CREATE TAG logical command. + * Create an CREATE OR REPLACE TAG logical command. */ - override def visitCreateTag(ctx: CreateTagContext): CreateTag = withOrigin(ctx) { - val createTagClause = ctx.createTagClause() + override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTag = withOrigin(ctx) { + val createTagClause = ctx.createReplaceTagClause() val tagName = createTagClause.identifier().getText @@ -152,11 +153,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS tagRefAgeMs ) + val replace = createTagClause.REPLACE() != null val ifNotExists = createTagClause.EXISTS() != null - CreateTag(typedVisit[Seq[String]](ctx.multipartIdentifier), + CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier), tagName, tagOptions, + replace, ifNotExists) } diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala similarity index 82% rename from spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala rename to spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala index 84df5ac650f8..e48f7d8ed04c 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateTag.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala @@ -21,13 +21,18 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions.Attribute -case class CreateTag(table: Seq[String], tag: String, tagOptions: TagOptions,ifNotExists: Boolean) extends LeafCommand { +case class CreateOrReplaceTag( + table: Seq[String], + tag: String, + tagOptions: TagOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafCommand { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ override lazy val output: Seq[Attribute] = Nil override def simpleString(maxFields: Int): String = { - s"Create tag: ${tag} for table: ${table.quoted}" + s"CreateOrReplaceTag tag: ${tag} for table: ${table.quoted}" } } diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala similarity index 78% rename from spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala rename to spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala index 877a0c5f44a7..d41f9f03ff4c 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTagExec.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -22,15 +22,16 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.CreateTag import org.apache.spark.sql.catalyst.plans.logical.TagOptions import org.apache.spark.sql.connector.catalog._ -case class CreateTagExec(catalog: TableCatalog, - ident: Identifier, - tag: String, - tagOptions: TagOptions, - ifNotExists: Boolean) extends LeafV2CommandExec { +case class CreateOrReplaceTagExec( + catalog: TableCatalog, + ident: Identifier, + tag: String, + tagOptions: TagOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafV2CommandExec { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -39,15 +40,20 @@ case class CreateTagExec(catalog: TableCatalog, override protected def run(): Seq[InternalRow] = { catalog.loadTable(ident) match { case iceberg: SparkTable => - val ref = iceberg.table().refs().get(tag); - if (ref != null && ifNotExists) { - return Nil - } - val snapshotId = tagOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId()) val manageSnapshot = iceberg.table.manageSnapshots() - .createTag(tag, snapshotId) - if(tagOptions.snapshotRefRetain.nonEmpty) { + if (!replace) { + val ref = iceberg.table().refs().get(tag); + if (ref != null && ifNotExists) { + return Nil + } + + manageSnapshot.createTag(tag, snapshotId) + } else { + manageSnapshot.replaceTag(tag, snapshotId) + } + + if (tagOptions.snapshotRefRetain.nonEmpty) { manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get) } diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index d19201ac64ba..6c7ededf88bb 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.Call import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch -import org.apache.spark.sql.catalyst.plans.logical.CreateTag +import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField @@ -67,8 +67,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil - case CreateTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, ifNotExists) => - CreateTagExec(catalog, ident, tag, tagOptions, ifNotExists) :: Nil + case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) => + CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java index 87cd2c204e55..f0c6901bd97f 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java @@ -35,42 +35,25 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import org.junit.runners.Parameterized; public class TestCreateTag extends SparkExtensionsTestBase { - private final String timeUnit; + private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"}; - @Parameterized.Parameters( - name = "catalogName = {0}, implementation = {1}, config = {2}, timeUnit = {3}") + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") public static Object[][] parameters() { return new Object[][] { { SparkCatalogConfig.SPARK.catalogName(), SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties(), - "days" - }, - { - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties(), - "hours" - }, - { - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties(), - "minutes" + SparkCatalogConfig.SPARK.properties() } }; } - public TestCreateTag( - String catalogName, String implementation, Map config, String timeUnit) { + public TestCreateTag(String catalogName, String implementation, Map config) { super(catalogName, implementation, config); - this.timeUnit = timeUnit; } @After @@ -79,27 +62,30 @@ public void removeTable() { } @Test - public void testCreateTag() throws NoSuchTableException { + public void testCreateTagWithRetain() throws NoSuchTableException { Table table = createDefaultTableAndInsert2Row(); long firstSnapshotId = table.currentSnapshot().snapshotId(); + long maxRefAge = 10L; List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); Dataset df = spark.createDataFrame(records, SimpleRecord.class); df.writeTo(tableName).append(); - String tagName = "t1"; - long maxRefAge = 10L; - sql( - "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", - tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); - table.refresh(); - SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(firstSnapshotId, ref.snapshotId()); - Assert.assertEquals( - TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), - ref.maxRefAgeMs().longValue()); + for (String timeUnit : TIME_UNITS) { + String tagName = "t1" + timeUnit; + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(firstSnapshotId, ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); + } + String tagName = "t1"; AssertHelpers.assertThrows( "Illegal statement", IcebergParseException.class, @@ -121,8 +107,6 @@ public void testCreateTag() throws NoSuchTableException { @Test public void testCreateTagUseDefaultConfig() throws NoSuchTableException { - Assume.assumeFalse(!timeUnit.equalsIgnoreCase("days")); - Table table = createDefaultTableAndInsert2Row(); long snapshotId = table.currentSnapshot().snapshotId(); String tagName = "t1"; @@ -163,15 +147,13 @@ public void testCreateTagIfNotExists() throws NoSuchTableException { long maxSnapshotAge = 2L; Table table = createDefaultTableAndInsert2Row(); String tagName = "t1"; - sql("ALTER TABLE %s CREATE TAG %s RETAIN %d %s", tableName, tagName, maxSnapshotAge, timeUnit); + sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName, maxSnapshotAge); sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName); table.refresh(); SnapshotRef ref = table.refs().get(tagName); Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); - Assert.assertEquals( - TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxSnapshotAge), - ref.maxRefAgeMs().longValue()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxRefAgeMs().longValue()); } private Table createDefaultTableAndInsert2Row() throws NoSuchTableException { diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java new file mode 100644 index 000000000000..49c03a888c12 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestReplaceTag extends SparkExtensionsTestBase { + + private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"}; + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + } + }; + } + + public TestReplaceTag(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testReplaceTagFailsForBranch() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + String branchName = "branch1"; + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + Table table = validationCatalog.loadTable(tableIdent); + long first = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branchName, first).commit(); + df.writeTo(tableName).append(); + long second = table.currentSnapshot().snapshotId(); + + AssertHelpers.assertThrows( + "Cannot perform replace tag on branches", + IllegalArgumentException.class, + "Ref branch1 is a branch not a tag", + () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName, second)); + } + + @Test + public void testReplaceTag() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + long expectedMaxRefAgeMs = 1000; + table + .manageSnapshots() + .createTag(tagName, first) + .setMaxRefAgeMs(tagName, expectedMaxRefAgeMs) + .commit(); + + df.writeTo(tableName).append(); + long second = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertNotNull(ref); + Assert.assertEquals(second, ref.snapshotId()); + Assert.assertEquals(expectedMaxRefAgeMs, ref.maxRefAgeMs().longValue()); + } + + @Test + public void testReplaceTagDoesNotExist() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + Table table = validationCatalog.loadTable(tableIdent); + + AssertHelpers.assertThrows( + "Cannot perform replace tag on tag which does not exist", + IllegalArgumentException.class, + "Tag does not exist", + () -> + sql( + "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", + tableName, "someTag", table.currentSnapshot().snapshotId())); + } + + @Test + public void testReplaceTagWithRetain() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, first).commit(); + SnapshotRef t1 = table.refs().get(tagName); + df.writeTo(tableName).append(); + long second = table.currentSnapshot().snapshotId(); + + long maxRefAge = 10; + for (String timeUnit : TIME_UNITS) { + sql( + "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, second, maxRefAge, timeUnit); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertNotNull(ref); + Assert.assertEquals(second, ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + } + } + + @Test + public void testCreateOrReplace() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + df.writeTo(tableName).append(); + long second = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag(tagName, second).commit(); + + sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertNotNull(ref); + Assert.assertEquals(first, ref.snapshotId()); + } +} From f51effaecea1c491e8f10738e04274ab91eb1876 Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Fri, 3 Feb 2023 15:42:42 +0800 Subject: [PATCH 3/5] address comments --- .../IcebergSqlExtensions.g4 | 6 ++++-- .../extensions/IcebergSqlExtensionsAstBuilder.scala | 2 -- .../iceberg/spark/extensions/TestCreateTag.java | 12 ++++++++++++ .../iceberg/spark/extensions/TestReplaceTag.java | 6 ------ 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index ba62c7321069..ef070057a3da 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -88,10 +88,12 @@ createReplaceBranchClause ; tagOptions - : (AS OF VERSION snapshotId)? (refRetain)?; + : (AS OF VERSION snapshotId)? (refRetain)? + ; branchOptions - : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?; + : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)? + ; snapshotRetention : WITH SNAPSHOT RETENTION minSnapshotsToKeep diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 48a6862f1be8..b02897aa7664 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -130,10 +130,8 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS branchOptions, replace, ifNotExists) - } - /** * Create an CREATE OR REPLACE TAG logical command. */ diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java index f0c6901bd97f..b6bd04025bc1 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java @@ -95,6 +95,12 @@ public void testCreateTagWithRetain() throws NoSuchTableException { "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN", tableName, tagName, firstSnapshotId, maxRefAge)); + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, tagName, "abc")); + AssertHelpers.assertThrows( "Illegal statement", IcebergParseException.class, @@ -129,6 +135,12 @@ public void testCreateTagUseDefaultConfig() throws NoSuchTableException { "already exists", () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName)); + AssertHelpers.assertThrows( + "Non-conforming tag name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123")); + table.manageSnapshots().removeTag(tagName).commit(); List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java index 49c03a888c12..61b021de46be 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java @@ -103,10 +103,8 @@ public void testReplaceTag() throws NoSuchTableException { long second = table.currentSnapshot().snapshotId(); sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second); - table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertNotNull(ref); Assert.assertEquals(second, ref.snapshotId()); Assert.assertEquals(expectedMaxRefAgeMs, ref.maxRefAgeMs().longValue()); } @@ -142,7 +140,6 @@ public void testReplaceTagWithRetain() throws NoSuchTableException { long first = table.currentSnapshot().snapshotId(); String tagName = "t1"; table.manageSnapshots().createTag(tagName, first).commit(); - SnapshotRef t1 = table.refs().get(tagName); df.writeTo(tableName).append(); long second = table.currentSnapshot().snapshotId(); @@ -154,7 +151,6 @@ public void testReplaceTagWithRetain() throws NoSuchTableException { table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertNotNull(ref); Assert.assertEquals(second, ref.snapshotId()); Assert.assertEquals( TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); @@ -177,10 +173,8 @@ public void testCreateOrReplace() throws NoSuchTableException { table.manageSnapshots().createTag(tagName, second).commit(); sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first); - table.refresh(); SnapshotRef ref = table.refs().get(tagName); - Assert.assertNotNull(ref); Assert.assertEquals(first, ref.snapshotId()); } } From 68c0353d6e963a415d73269480d8ed2e9821460c Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Fri, 10 Feb 2023 10:21:03 +0800 Subject: [PATCH 4/5] test class name --- .../spark/extensions/TestCreateTag.java | 181 ------------------ .../{TestReplaceTag.java => TestTagDDL.java} | 134 ++++++++++++- 2 files changed, 126 insertions(+), 189 deletions(-) delete mode 100644 spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java rename spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/{TestReplaceTag.java => TestTagDDL.java} (57%) diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java deleted file mode 100644 index b6bd04025bc1..000000000000 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateTag.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.extensions; - -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.Table; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.spark.SparkCatalogConfig; -import org.apache.iceberg.spark.source.SimpleRecord; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runners.Parameterized; - -public class TestCreateTag extends SparkExtensionsTestBase { - private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"}; - - @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") - public static Object[][] parameters() { - return new Object[][] { - { - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties() - } - }; - } - - public TestCreateTag(String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @After - public void removeTable() { - sql("DROP TABLE IF EXISTS %s", tableName); - } - - @Test - public void testCreateTagWithRetain() throws NoSuchTableException { - Table table = createDefaultTableAndInsert2Row(); - long firstSnapshotId = table.currentSnapshot().snapshotId(); - long maxRefAge = 10L; - - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - - for (String timeUnit : TIME_UNITS) { - String tagName = "t1" + timeUnit; - sql( - "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", - tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); - table.refresh(); - SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(firstSnapshotId, ref.snapshotId()); - Assert.assertEquals( - TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), - ref.maxRefAgeMs().longValue()); - } - - String tagName = "t1"; - AssertHelpers.assertThrows( - "Illegal statement", - IcebergParseException.class, - "mismatched input", - () -> - sql( - "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN", - tableName, tagName, firstSnapshotId, maxRefAge)); - - AssertHelpers.assertThrows( - "Illegal statement", - IcebergParseException.class, - "mismatched input", - () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, tagName, "abc")); - - AssertHelpers.assertThrows( - "Illegal statement", - IcebergParseException.class, - "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}", - () -> - sql( - "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d SECONDS", - tableName, tagName, firstSnapshotId, maxRefAge)); - } - - @Test - public void testCreateTagUseDefaultConfig() throws NoSuchTableException { - Table table = createDefaultTableAndInsert2Row(); - long snapshotId = table.currentSnapshot().snapshotId(); - String tagName = "t1"; - - AssertHelpers.assertThrows( - "unknown snapshot", - ValidationException.class, - "unknown snapshot: -1", - () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, -1)); - - sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName); - table.refresh(); - SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(snapshotId, ref.snapshotId()); - Assert.assertNull(ref.maxRefAgeMs()); - - AssertHelpers.assertThrows( - "Cannot create an exist tag", - IllegalArgumentException.class, - "already exists", - () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName)); - - AssertHelpers.assertThrows( - "Non-conforming tag name", - IcebergParseException.class, - "mismatched input '123'", - () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123")); - - table.manageSnapshots().removeTag(tagName).commit(); - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - snapshotId = table.currentSnapshot().snapshotId(); - sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); - table.refresh(); - ref = table.refs().get(tagName); - Assert.assertEquals(snapshotId, ref.snapshotId()); - Assert.assertNull(ref.maxRefAgeMs()); - } - - @Test - public void testCreateTagIfNotExists() throws NoSuchTableException { - long maxSnapshotAge = 2L; - Table table = createDefaultTableAndInsert2Row(); - String tagName = "t1"; - sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName, maxSnapshotAge); - sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName); - - table.refresh(); - SnapshotRef ref = table.refs().get(tagName); - Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); - Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxRefAgeMs().longValue()); - } - - private Table createDefaultTableAndInsert2Row() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); - - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - Table table = validationCatalog.loadTable(tableIdent); - return table; - } -} diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java similarity index 57% rename from spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java rename to spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java index 61b021de46be..cd4e87fd7367 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestReplaceTag.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -19,24 +19,27 @@ package org.apache.iceberg.spark.extensions; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runners.Parameterized; -public class TestReplaceTag extends SparkExtensionsTestBase { - +public class TestTagDDL extends SparkExtensionsTestBase { private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"}; @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") @@ -50,18 +53,129 @@ public static Object[][] parameters() { }; } - public TestReplaceTag(String catalogName, String implementation, Map config) { + public TestTagDDL(String catalogName, String implementation, Map config) { super(catalogName, implementation, config); } + @Before + public void before() { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + } + @After public void removeTable() { sql("DROP TABLE IF EXISTS %s", tableName); } + @Test + public void testCreateTagWithRetain() throws NoSuchTableException { + Table table = insertRows(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + long maxRefAge = 10L; + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + for (String timeUnit : TIME_UNITS) { + String tagName = "t1" + timeUnit; + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(firstSnapshotId, ref.snapshotId()); + Assert.assertEquals( + TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); + } + + String tagName = "t1"; + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN", + tableName, tagName, firstSnapshotId, maxRefAge)); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, tagName, "abc")); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d SECONDS", + tableName, tagName, firstSnapshotId, maxRefAge)); + } + + @Test + public void testCreateTagUseDefaultConfig() throws NoSuchTableException { + Table table = insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + + AssertHelpers.assertThrows( + "unknown snapshot", + ValidationException.class, + "unknown snapshot: -1", + () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, -1)); + + sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(snapshotId, ref.snapshotId()); + Assert.assertNull(ref.maxRefAgeMs()); + + AssertHelpers.assertThrows( + "Cannot create an exist tag", + IllegalArgumentException.class, + "already exists", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName)); + + AssertHelpers.assertThrows( + "Non-conforming tag name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123")); + + table.manageSnapshots().removeTag(tagName).commit(); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + snapshotId = table.currentSnapshot().snapshotId(); + sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); + table.refresh(); + ref = table.refs().get(tagName); + Assert.assertEquals(snapshotId, ref.snapshotId()); + Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateTagIfNotExists() throws NoSuchTableException { + long maxSnapshotAge = 2L; + Table table = insertRows(); + String tagName = "t1"; + sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName, maxSnapshotAge); + sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxRefAgeMs().longValue()); + } + @Test public void testReplaceTagFailsForBranch() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); String branchName = "branch1"; List records = @@ -83,7 +197,6 @@ public void testReplaceTagFailsForBranch() throws NoSuchTableException { @Test public void testReplaceTag() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); Dataset df = spark.createDataFrame(records, SimpleRecord.class); @@ -111,7 +224,6 @@ public void testReplaceTag() throws NoSuchTableException { @Test public void testReplaceTagDoesNotExist() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); Dataset df = spark.createDataFrame(records, SimpleRecord.class); @@ -130,7 +242,6 @@ public void testReplaceTagDoesNotExist() throws NoSuchTableException { @Test public void testReplaceTagWithRetain() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); Dataset df = spark.createDataFrame(records, SimpleRecord.class); @@ -159,7 +270,6 @@ public void testReplaceTagWithRetain() throws NoSuchTableException { @Test public void testCreateOrReplace() throws NoSuchTableException { - sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); List records = ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); Dataset df = spark.createDataFrame(records, SimpleRecord.class); @@ -177,4 +287,12 @@ public void testCreateOrReplace() throws NoSuchTableException { SnapshotRef ref = table.refs().get(tagName); Assert.assertEquals(first, ref.snapshotId()); } + + private Table insertRows() throws NoSuchTableException { + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + return validationCatalog.loadTable(tableIdent); + } } From 463d04214c873d969c50f363c7ef6d4363e745e2 Mon Sep 17 00:00:00 2001 From: Liwei Li Date: Fri, 10 Feb 2023 13:36:04 +0800 Subject: [PATCH 5/5] improve ut --- .../iceberg/spark/extensions/TestTagDDL.java | 42 ++++--------------- 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java index cd4e87fd7367..def040c15e59 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -177,15 +177,10 @@ public void testCreateTagIfNotExists() throws NoSuchTableException { @Test public void testReplaceTagFailsForBranch() throws NoSuchTableException { String branchName = "branch1"; - - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - Table table = validationCatalog.loadTable(tableIdent); + Table table = insertRows(); long first = table.currentSnapshot().snapshotId(); table.manageSnapshots().createBranch(branchName, first).commit(); - df.writeTo(tableName).append(); + insertRows(); long second = table.currentSnapshot().snapshotId(); AssertHelpers.assertThrows( @@ -197,12 +192,7 @@ public void testReplaceTagFailsForBranch() throws NoSuchTableException { @Test public void testReplaceTag() throws NoSuchTableException { - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - - Table table = validationCatalog.loadTable(tableIdent); + Table table = insertRows(); long first = table.currentSnapshot().snapshotId(); String tagName = "t1"; long expectedMaxRefAgeMs = 1000; @@ -212,7 +202,7 @@ public void testReplaceTag() throws NoSuchTableException { .setMaxRefAgeMs(tagName, expectedMaxRefAgeMs) .commit(); - df.writeTo(tableName).append(); + insertRows(); long second = table.currentSnapshot().snapshotId(); sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second); @@ -224,11 +214,7 @@ public void testReplaceTag() throws NoSuchTableException { @Test public void testReplaceTagDoesNotExist() throws NoSuchTableException { - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - Table table = validationCatalog.loadTable(tableIdent); + Table table = insertRows(); AssertHelpers.assertThrows( "Cannot perform replace tag on tag which does not exist", @@ -242,16 +228,11 @@ public void testReplaceTagDoesNotExist() throws NoSuchTableException { @Test public void testReplaceTagWithRetain() throws NoSuchTableException { - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - - Table table = validationCatalog.loadTable(tableIdent); + Table table = insertRows(); long first = table.currentSnapshot().snapshotId(); String tagName = "t1"; table.manageSnapshots().createTag(tagName, first).commit(); - df.writeTo(tableName).append(); + insertRows(); long second = table.currentSnapshot().snapshotId(); long maxRefAge = 10; @@ -270,15 +251,10 @@ public void testReplaceTagWithRetain() throws NoSuchTableException { @Test public void testCreateOrReplace() throws NoSuchTableException { - List records = - ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); - Dataset df = spark.createDataFrame(records, SimpleRecord.class); - df.writeTo(tableName).append(); - - Table table = validationCatalog.loadTable(tableIdent); + Table table = insertRows(); long first = table.currentSnapshot().snapshotId(); String tagName = "t1"; - df.writeTo(tableName).append(); + insertRows(); long second = table.currentSnapshot().snapshotId(); table.manageSnapshots().createTag(tagName, second).commit();