Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of(
AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN,
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE);
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE, AlterTableType.CREATE_BRANCH);
private static final List<String> MIGRATION_ALLOWED_SOURCE_FORMATS = ImmutableList.of(
FileFormat.PARQUET.name().toLowerCase(),
FileFormat.ORC.name().toLowerCase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
import org.apache.hadoop.hive.ql.parse.SemanticException;
Expand Down Expand Up @@ -716,6 +717,28 @@ private static ExecutorService getDeleteExecutorService(String completeName, int
});
}

@Override
public void alterTableBranchOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
AlterTableBranchSpec alterBranchSpec) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
Optional.ofNullable(icebergTable.currentSnapshot()).orElseThrow(() ->
new UnsupportedOperationException(String.format("Cannot alter branch on iceberg table" +
" %s.%s which has no snapshot", hmsTable.getDbName(), hmsTable.getTableName())));

switch (alterBranchSpec.getOperationType()) {
case CREATE_BRANCH:
AlterTableBranchSpec.CreateBranchSpec createBranchSpec =
(AlterTableBranchSpec.CreateBranchSpec) alterBranchSpec.getOperationParams();
IcebergBranchExec.createBranch(icebergTable, createBranchSpec);
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", alterBranchSpec.getOperationType().name()));
}

}

@Override
public boolean isValidMetadataTable(String metaTableName) {
return IcebergMetadataTables.isValidMetaTable(metaTableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergBranchExec {

private static final Logger LOG = LoggerFactory.getLogger(IcebergBranchExec.class);

private IcebergBranchExec() {
}

/**
* Create a branch on the iceberg table
* @param table the iceberg table
* @param createBranchSpec Get the basic parameters needed to create a branch
*/
public static void createBranch(Table table, AlterTableBranchSpec.CreateBranchSpec createBranchSpec) {
String branchName = createBranchSpec.getBranchName();
Long snapshotId = null;
if (createBranchSpec.getSnapshotId() != null) {
snapshotId = createBranchSpec.getSnapshotId();
} else if (createBranchSpec.getAsOfTime() != null) {
snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, createBranchSpec.getAsOfTime());
} else {
snapshotId = table.currentSnapshot().snapshotId();
}
LOG.info("Creating branch {} on iceberg table {} with snapshotId {}", branchName, table.name(), snapshotId);
ManageSnapshots manageSnapshots = table.manageSnapshots();
manageSnapshots.createBranch(branchName, snapshotId);
if (createBranchSpec.getMaxRefAgeMs() != null) {
manageSnapshots.setMaxRefAgeMs(branchName, createBranchSpec.getMaxRefAgeMs());
}
if (createBranchSpec.getMinSnapshotsToKeep() != null) {
manageSnapshots.setMinSnapshotsToKeep(branchName, createBranchSpec.getMinSnapshotsToKeep());
}
if (createBranchSpec.getMaxSnapshotAgeMs() != null) {
manageSnapshots.setMaxSnapshotAgeMs(branchName, createBranchSpec.getMaxSnapshotAgeMs());
}

manageSnapshots.commit();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.mr.hive.HiveIcebergTestUtils.timestampAfterSnapshot;

public class TestHiveIcebergBranchOperation extends HiveIcebergStorageHandlerWithEngineBase {

@Test
public void testCreateBranchWithDefaultConfig() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s", branchName));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertNull(ref.minSnapshotsToKeep());
Assert.assertNull(ref.maxSnapshotAgeMs());
Assert.assertNull(ref.maxRefAgeMs());

// creating a branch which is already exists will fail
try {
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s", branchName));
} catch (Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
Assert.assertTrue(e.getMessage().contains("Ref test_branch_1 already exists"));
}
}

@Test
public void testCreateBranchWithSnapshotId() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
Long snapshotId = table.history().get(0).snapshotId();
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s FOR SYSTEM_VERSION AS OF %d",
branchName, snapshotId));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
Assert.assertNull(ref.minSnapshotsToKeep());
Assert.assertNull(ref.maxSnapshotAgeMs());
Assert.assertNull(ref.maxRefAgeMs());
}

@Test
public void testCreateBranchWithTimeStamp() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
Long snapshotId = table.history().get(0).snapshotId();

shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s FOR SYSTEM_TIME AS OF '%s'",
branchName, timestampAfterSnapshot(table, 0)));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
}

@Test
public void testCreateBranchWithMaxRefAge() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
long maxRefAge = 5L;
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s RETAIN %d DAYS",
branchName, maxRefAge));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertNull(ref.minSnapshotsToKeep());
Assert.assertNull(ref.maxSnapshotAgeMs());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
}

@Test
public void testCreateBranchWithMinSnapshotsToKeep() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
Integer minSnapshotsToKeep = 2;
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS",
branchName, minSnapshotsToKeep));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
Assert.assertNull(ref.maxSnapshotAgeMs());
Assert.assertNull(ref.maxRefAgeMs());
}

@Test
public void testCreateBranchWithMinSnapshotsToKeepAndMaxSnapshotAge() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
Integer minSnapshotsToKeep = 2;
long maxSnapshotAge = 2L;
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS" +
" %d DAYS", branchName, minSnapshotsToKeep, maxSnapshotAge));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue());
Assert.assertNull(ref.maxRefAgeMs());
}

@Test
public void testCreateBranchWithAllCustomConfig() throws IOException, InterruptedException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String branchName = "test_branch_1";
Long snapshotId = table.history().get(0).snapshotId();
Integer minSnapshotsToKeep = 2;
long maxSnapshotAge = 2L;
long maxRefAge = 5L;
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s FOR SYSTEM_VERSION AS OF %d RETAIN" +
" %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days",
branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge));
table.refresh();
SnapshotRef ref = table.refs().get(branchName);
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
}

@Test
public void testCreateBranchWithNonIcebergTable() {
shell.executeStatement("create table nonice_tbl (id int, name string)");

String branchName = "test_branch_1";
try {
shell.executeStatement(String.format("ALTER TABLE nonice_tbl CREATE BRANCH %s", branchName));
} catch (Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
Assert.assertTrue(e.getMessage().contains("Not an iceberg table"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
create table ice_tbl (id int, name string) Stored by Iceberg;

alter table ice_tbl create branch test_branch_1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- SORT_QUERY_RESULTS
set hive.explain.user=false;

create table iceTbl (id int, name string) Stored by Iceberg;

-- creating branch requires table to have current snapshot. here insert some values to generate current snapshot
insert into iceTbl values(1, 'jack');

-- create s branch test_branch_1 with default values based on the current snapshotId
explain alter table iceTbl create branch test_branch_1;
alter table iceTbl create branch test_branch_1;
-- check the values, one value
select * from iceTbl for system_version as of 'test_branch_1';

-- create a branch test_branch_2 which could be retained 5 days based on the current snapshotId
insert into iceTbl values(2, 'bob');
explain alter table iceTbl create branch test_branch_2 retain 5 days;
alter table iceTbl create branch test_branch_2 retain 5 days;
-- check the values, two values
select * from iceTbl for system_version as of 'test_branch_2';

-- create a branch test_branch_3 with 5 snapshots based on the current snapshotId
insert into iceTbl values(3, 'tom');
explain alter table iceTbl create branch test_branch_3 with snapshot retention 5 snapshots;
alter table iceTbl create branch test_branch_3 with snapshot retention 5 snapshots;
-- check the values, three values
select * from iceTbl for system_version as of 'test_branch_3';

-- create a branch test_branch_4 based on the current snapshotId that has 5 snapshots, each of which is retained for 5 days
insert into iceTbl values(4, 'lisa');
explain alter table iceTbl create branch test_branch_4 with snapshot retention 5 snapshots 5 days;
alter table iceTbl create branch test_branch_4 with snapshot retention 5 snapshots 5 days;
-- check the values, four values
select * from iceTbl for system_version as of 'test_branch_4';
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
PREHOOK: query: create table ice_tbl (id int, name string) Stored by Iceberg
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_tbl
POSTHOOK: query: create table ice_tbl (id int, name string) Stored by Iceberg
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_tbl
PREHOOK: query: alter table ice_tbl create branch test_branch_1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@ice_tbl
FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.UnsupportedOperationException: Cannot alter branch on iceberg table default.ice_tbl which has no snapshot
Loading