Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ statement
| ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields
| ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch
;

createReplaceTagClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
normalized.contains("create branch") ||
normalized.contains("replace branch") ||
normalized.contains("create tag") ||
normalized.contains("replace tag")
normalized.contains("replace tag") ||
normalized.contains("drop branch")
}

protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.logical.CallArgument
import org.apache.spark.sql.catalyst.plans.logical.CallStatement
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -161,6 +162,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
ifNotExists)
}

/**
* Create an DROP BRANCH logical command.
*/
override def visitDropBranch(ctx: DropBranchContext): DropBranch = withOrigin(ctx) {
DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
}

/**
* Create an REPLACE PARTITION FIELD logical command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class CreateOrReplaceBranch(table: Seq[String], branch: String,
branchOptions: BranchOptions, replace: Boolean, ifNotExists: Boolean)
extends LeafCommand {
case class CreateOrReplaceBranch(
table: Seq[String],
branch: String,
branchOptions: BranchOptions,
replace: Boolean,
ifNotExists: Boolean) extends LeafCommand {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute

case class DropBranch(table: Seq[String], branch: String, ifExists: Boolean) extends LeafCommand {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"DropBranch branch: ${branch} for table: ${table.quoted}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog

case class DropBranchExec(
catalog: TableCatalog,
ident: Identifier,
branch: String,
ifExists: Boolean) extends LeafV2CommandExec {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable =>
val ref = iceberg.table().refs().get(branch)
if (ref != null || !ifExists) {
iceberg.table().manageSnapshots().removeBranch(branch).commit()
}

case table =>
throw new UnsupportedOperationException(s"Cannot drop branch on non-Iceberg table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"DropBranch branch: ${branch} for table: ${ident.quoted}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
import org.apache.spark.sql.catalyst.plans.logical.DropBranch
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -70,6 +71,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) =>
CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil

case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
DropBranchExec(catalog, ident, branch, ifExists) :: Nil

case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) =>
DropPartitionFieldExec(catalog, ident, transform) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,21 @@
import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;

public class TestCreateBranch extends SparkExtensionsTestBase {
public class TestBranchDDL extends SparkExtensionsTestBase {

@Before
public void before() {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
}

@After
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
Expand All @@ -49,18 +60,13 @@ public static Object[][] parameters() {
};
}

public TestCreateBranch(String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

@After
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
public TestBranchDDL(String catalog, String implementation, Map<String, String> properties) {
super(catalog, implementation, properties);
}

@Test
public void testCreateBranch() throws NoSuchTableException {
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
long snapshotId = table.currentSnapshot().snapshotId();
String branchName = "b1";
Integer minSnapshotsToKeep = 2;
Expand All @@ -79,13 +85,13 @@ public void testCreateBranch() throws NoSuchTableException {
AssertHelpers.assertThrows(
"Cannot create an existing branch",
IllegalArgumentException.class,
"already exists",
"Ref b1 already exists",
() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName));
}

@Test
public void testCreateBranchUseDefaultConfig() throws NoSuchTableException {
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
String branchName = "b1";
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);
table.refresh();
Expand All @@ -99,7 +105,7 @@ public void testCreateBranchUseDefaultConfig() throws NoSuchTableException {
@Test
public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException {
Integer minSnapshotsToKeep = 2;
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
String branchName = "b1";
sql(
"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS",
Expand All @@ -115,7 +121,7 @@ public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableExce
@Test
public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException {
long maxSnapshotAge = 2L;
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
String branchName = "b1";
sql(
"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d DAYS",
Expand All @@ -131,7 +137,7 @@ public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableExceptio
@Test
public void testCreateBranchIfNotExists() throws NoSuchTableException {
long maxSnapshotAge = 2L;
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
String branchName = "b1";
sql(
"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d DAYS",
Expand All @@ -151,7 +157,7 @@ public void testCreateBranchUseCustomMinSnapshotsToKeepAndMaxSnapshotAge()
throws NoSuchTableException {
Integer minSnapshotsToKeep = 2;
long maxSnapshotAge = 2L;
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
String branchName = "b1";
sql(
"ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS %d DAYS",
Expand All @@ -174,7 +180,7 @@ public void testCreateBranchUseCustomMinSnapshotsToKeepAndMaxSnapshotAge()
@Test
public void testCreateBranchUseCustomMaxRefAge() throws NoSuchTableException {
long maxRefAge = 10L;
Table table = createDefaultTableAndInsert2Row();
Table table = insertRows();
String branchName = "b1";
sql("ALTER TABLE %s CREATE BRANCH %s RETAIN %d DAYS", tableName, branchName, maxRefAge);
table.refresh();
Expand Down Expand Up @@ -206,9 +212,55 @@ public void testCreateBranchUseCustomMaxRefAge() throws NoSuchTableException {
tableName, branchName, maxRefAge));
}

private Table createDefaultTableAndInsert2Row() throws NoSuchTableException {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
@Test
public void testDropBranch() throws NoSuchTableException {
insertRows();

Table table = validationCatalog.loadTable(tableIdent);
String branchName = "b1";
table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());

sql("ALTER TABLE %s DROP BRANCH %s", tableName, branchName);
table.refresh();

ref = table.refs().get(branchName);
Assert.assertNull(ref);
}

@Test
public void testDropBranchDoesNotExist() {
AssertHelpers.assertThrows(
"Cannot perform drop branch on branch which does not exist",
IllegalArgumentException.class,
"Branch does not exist: nonExistingBranch",
() -> sql("ALTER TABLE %s DROP BRANCH %s", tableName, "nonExistingBranch"));
}

@Test
public void testDropMainBranchFails() {
AssertHelpers.assertThrows(
"Cannot drop the main branch",
IllegalArgumentException.class,
"Cannot remove main branch",
() -> sql("ALTER TABLE %s DROP BRANCH main", tableName));
}

@Test
public void testDropBranchIfExists() {
String branchName = "nonExistingBranch";
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertNull(table.refs().get(branchName));

sql("ALTER TABLE %s DROP BRANCH IF EXISTS %s", tableName, branchName);
table.refresh();

SnapshotRef ref = table.refs().get(branchName);
Assert.assertNull(ref);
}

private Table insertRows() throws NoSuchTableException {
List<SimpleRecord> records =
ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
Expand Down