Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ statement
| ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch
| ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag
Comment on lines 78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be written like :

Suggested change
| ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch
| ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag
| ALTER TABLE multipartIdentifier DROP (BRANCH | TAG) (IF EXISTS)? identifier #dropBranchOrTag

Copy link
Contributor

@jackye1995 jackye1995 Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we discussed this in #6637 (comment), and the conclusion was that it is more clear to have them separated.

The current way for separated DROP BRANCH and DROP TAG makes the logic consistent with CREATE.

Let me know what you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, was mostly coming from spark code base, considering this example :

https://github.com/apache/spark/blob/46a234125d3f125ba1f9ccd6af0ec1ba61016c1e/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4#L125

If we have reached the concencus, then it should be fine.

;

createReplaceTagClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
normalized.contains("replace branch") ||
normalized.contains("create tag") ||
normalized.contains("replace tag") ||
normalized.contains("drop branch")
normalized.contains("drop branch") ||
normalized.contains("drop tag")
}

protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
Expand Down Expand Up @@ -169,6 +170,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
}

/**
* Create an DROP TAG logical command.
*/
override def visitDropTag(ctx: DropTagContext): DropTag = withOrigin(ctx) {
DropTag(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
}

/**
* Create an REPLACE PARTITION FIELD logical command.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class DropTag(table: Seq[String], tag: String, ifExists: Boolean) extends LeafCommand {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"DropTag tag: ${tag} for table: ${table.quoted}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog

case class DropTagExec(
catalog: TableCatalog,
ident: Identifier,
tag: String,
ifExists: Boolean) extends LeafV2CommandExec {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val ref = iceberg.table().refs().get(tag)
if (ref != null || !ifExists) {
iceberg.table().manageSnapshots().removeTag(tag).commit()
}

case table =>
throw new UnsupportedOperationException(s"Cannot drop tag on non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"DropTag tag: ${tag} for table: ${ident.quoted}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeRows
import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
Expand Down Expand Up @@ -74,6 +75,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
DropBranchExec(catalog, ident, branch, ifExists) :: Nil

case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
DropTagExec(catalog, ident, tag, ifExists) :: Nil

case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) =>
DropPartitionFieldExec(catalog, ident, transform) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,28 @@ public void testDropBranchDoesNotExist() {
() -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, "nonExistingBranch"));
}

@Test
public void testDropBranchFailsForTag() throws NoSuchTableException {
String tagName = "b1";
Table table = insertRows();
table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit();

AssertHelpers.assertThrows(
"Cannot perform drop branch on tag",
IllegalArgumentException.class,
"Ref b1 is a tag not a branch",
() -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, tagName));
}

@Test
public void testDropBranchNonConformingName() {
AssertHelpers.assertThrows(
"Non-conforming branch name",
IcebergParseException.class,
"mismatched input '123'",
() -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, "123"));
}

@Test
public void testDropMainBranchFails() {
AssertHelpers.assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ public void testCreateTagWithRetain() throws NoSuchTableException {
tableName, tagName, firstSnapshotId, maxRefAge, timeUnit);
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(firstSnapshotId, ref.snapshotId());
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.", firstSnapshotId, ref.snapshotId());
Assert.assertEquals(
"The tag needs to have the correct max ref age.",
TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge),
ref.maxRefAgeMs().longValue());
}
Expand Down Expand Up @@ -132,8 +134,10 @@ public void testCreateTagUseDefaultConfig() throws NoSuchTableException {
sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName);
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(snapshotId, ref.snapshotId());
Assert.assertNull(ref.maxRefAgeMs());
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId());
Assert.assertNull(
"The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs());

AssertHelpers.assertThrows(
"Cannot create an exist tag",
Expand All @@ -156,8 +160,10 @@ public void testCreateTagUseDefaultConfig() throws NoSuchTableException {
sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId);
table.refresh();
ref = table.refs().get(tagName);
Assert.assertEquals(snapshotId, ref.snapshotId());
Assert.assertNull(ref.maxRefAgeMs());
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId());
Assert.assertNull(
"The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs());
}

@Test
Expand All @@ -170,8 +176,14 @@ public void testCreateTagIfNotExists() throws NoSuchTableException {

table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxRefAgeMs().longValue());
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.",
table.currentSnapshot().snapshotId(),
ref.snapshotId());
Assert.assertEquals(
"The tag needs to have the correct max ref age.",
TimeUnit.DAYS.toMillis(maxSnapshotAge),
ref.maxRefAgeMs().longValue());
}

@Test
Expand Down Expand Up @@ -208,8 +220,12 @@ public void testReplaceTag() throws NoSuchTableException {
sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second);
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(second, ref.snapshotId());
Assert.assertEquals(expectedMaxRefAgeMs, ref.maxRefAgeMs().longValue());
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.", second, ref.snapshotId());
Assert.assertEquals(
"The tag needs to have the correct max ref age.",
expectedMaxRefAgeMs,
ref.maxRefAgeMs().longValue());
}

@Test
Expand Down Expand Up @@ -243,9 +259,12 @@ public void testReplaceTagWithRetain() throws NoSuchTableException {

table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(second, ref.snapshotId());
Assert.assertEquals(
TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
"The tag needs to point to a specific snapshot id.", second, ref.snapshotId());
Assert.assertEquals(
"The tag needs to have the correct max ref age.",
TimeUnit.valueOf(timeUnit).toMillis(maxRefAge),
ref.maxRefAgeMs().longValue());
}
}

Expand All @@ -261,7 +280,78 @@ public void testCreateOrReplace() throws NoSuchTableException {
sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first);
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(first, ref.snapshotId());
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.", first, ref.snapshotId());
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also have a test to ensure that DROP TAG does not work when the ref is a branch. If I missed the opposite case for DROP BRANCH, I think let's add that as well for completeness.

public void testDropTag() throws NoSuchTableException {
insertRows();
Table table = validationCatalog.loadTable(tableIdent);
String tagName = "t1";
table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(
"The tag needs to point to a specific snapshot id.",
table.currentSnapshot().snapshotId(),
ref.snapshotId());

sql("ALTER TABLE %s DROP TAG %s", tableName, tagName);
table.refresh();
ref = table.refs().get(tagName);
Assert.assertNull("The tag needs to be dropped.", ref);
}

@Test
public void testDropTagNonConformingName() {
AssertHelpers.assertThrows(
"Non-conforming tag name",
IcebergParseException.class,
"mismatched input '123'",
() -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123"));
Comment on lines +307 to +311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to include this test but could we separate it and have this test just focused on the happy case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

@Test
public void testDropTagDoesNotExist() {
AssertHelpers.assertThrows(
"Cannot perform drop tag on tag which does not exist",
IllegalArgumentException.class,
"Tag does not exist: nonExistingTag",
() -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag"));
}

@Test
public void testDropTagFailesForBranch() throws NoSuchTableException {
String branchName = "b1";
Table table = insertRows();
table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit();

AssertHelpers.assertThrows(
"Cannot perform drop tag on branch",
IllegalArgumentException.class,
"Ref b1 is a branch not a tag",
() -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName));
}

@Test
public void testDropTagIfExists() throws NoSuchTableException {
String tagName = "nonExistingTag";
Table table = insertRows();
Assert.assertNull("The tag does not exists.", table.refs().get(tagName));

sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName);
table.refresh();
Assert.assertNull("The tag still does not exist.", table.refs().get(tagName));

table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit();
Assert.assertEquals(
"The tag has been created successfully.",
table.currentSnapshot().snapshotId(),
table.refs().get(tagName).snapshotId());

sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName);
table.refresh();
Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName));
}

private Table insertRows() throws NoSuchTableException {
Expand Down