Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE, AlterTableType.CREATE_BRANCH,
AlterTableType.CREATE_TAG, AlterTableType.DROP_BRANCH, AlterTableType.RENAME_BRANCH, AlterTableType.DROPPARTITION,
AlterTableType.DROP_TAG, AlterTableType.COMPACT, AlterTableType.REPLACE_BRANCH);
AlterTableType.DROP_TAG, AlterTableType.COMPACT, AlterTableType.REPLACE_SNAPSHOTREF);
private static final List<String> MIGRATION_ALLOWED_SOURCE_FORMATS = ImmutableList.of(
FileFormat.PARQUET.name().toLowerCase(),
FileFormat.ORC.name().toLowerCase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Ta
case CREATE_BRANCH:
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.createBranch(icebergTable, createBranchSpec);
IcebergSnapshotRefExec.createBranch(icebergTable, createBranchSpec);
break;
case CREATE_TAG:
Optional.ofNullable(icebergTable.currentSnapshot()).orElseThrow(() -> new UnsupportedOperationException(
Expand All @@ -998,27 +998,31 @@ public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Ta
hmsTable.getTableName())));
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createTagSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergTagExec.createTag(icebergTable, createTagSpec);
IcebergSnapshotRefExec.createTag(icebergTable, createTagSpec);
break;
case DROP_BRANCH:
AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropBranchSpec =
(AlterTableSnapshotRefSpec.DropSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.dropBranch(icebergTable, dropBranchSpec);
IcebergSnapshotRefExec.dropBranch(icebergTable, dropBranchSpec);
break;
case RENAME_BRANCH:
AlterTableSnapshotRefSpec.RenameSnapshotrefSpec renameSnapshotrefSpec =
(AlterTableSnapshotRefSpec.RenameSnapshotrefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.renameBranch(icebergTable, renameSnapshotrefSpec);
IcebergSnapshotRefExec.renameBranch(icebergTable, renameSnapshotrefSpec);
break;
case REPLACE_BRANCH:
case REPLACE_SNAPSHOTREF:
AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceSnapshotrefSpec =
(AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.replaceBranch(icebergTable, replaceSnapshotrefSpec);
if (replaceSnapshotrefSpec.isReplaceBranch()) {
IcebergSnapshotRefExec.replaceBranch(icebergTable, replaceSnapshotrefSpec);
} else {
IcebergSnapshotRefExec.replaceTag(icebergTable, replaceSnapshotrefSpec);
}
break;
case DROP_TAG:
AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropTagSpec =
(AlterTableSnapshotRefSpec.DropSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergTagExec.dropTag(icebergTable, dropTagSpec);
IcebergSnapshotRefExec.dropTag(icebergTable, dropTagSpec);
break;
default:
throw new UnsupportedOperationException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergBranchExec {
public class IcebergSnapshotRefExec {

private static final Logger LOG = LoggerFactory.getLogger(IcebergBranchExec.class);
private static final Logger LOG = LoggerFactory.getLogger(IcebergSnapshotRefExec.class);

private IcebergBranchExec() {
private IcebergSnapshotRefExec() {
}

/**
Expand Down Expand Up @@ -100,7 +100,7 @@ public static void renameBranch(Table table, AlterTableSnapshotRefSpec.RenameSna

public static void replaceBranch(Table table,
AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceSnapshotrefSpec) {
String sourceBranch = replaceSnapshotrefSpec.getSourceBranchName();
String sourceBranch = replaceSnapshotrefSpec.getSourceRefName();
ManageSnapshots manageSnapshots;
if (replaceSnapshotrefSpec.isReplaceBySnapshot()) {
long targetSnapshot = replaceSnapshotrefSpec.getTargetSnapshot();
Expand All @@ -111,6 +111,52 @@ public static void replaceBranch(Table table,
LOG.info("Replacing branch {} with branch {} on iceberg table {}", sourceBranch, targetBranch, table.name());
manageSnapshots = table.manageSnapshots().replaceBranch(sourceBranch, targetBranch);
}
setOptionalReplaceParams(replaceSnapshotrefSpec, manageSnapshots, sourceBranch);
manageSnapshots.commit();
}

public static void createTag(Table table, AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createTagSpec) {
String tagName = createTagSpec.getRefName();
Long snapshotId = null;
if (createTagSpec.getSnapshotId() != null) {
snapshotId = createTagSpec.getSnapshotId();
} else if (createTagSpec.getAsOfTime() != null) {
snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, createTagSpec.getAsOfTime());
} else {
snapshotId = table.currentSnapshot().snapshotId();
}
LOG.info("Creating tag {} on iceberg table {} with snapshotId {}", tagName, table.name(), snapshotId);
ManageSnapshots manageSnapshots = table.manageSnapshots();
manageSnapshots.createTag(tagName, snapshotId);
if (createTagSpec.getMaxRefAgeMs() != null) {
manageSnapshots.setMaxRefAgeMs(tagName, createTagSpec.getMaxRefAgeMs());
}

manageSnapshots.commit();
}

public static void dropTag(Table table, AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropTagSpec) {
String tagName = dropTagSpec.getRefName();
boolean ifExists = dropTagSpec.getIfExists();

SnapshotRef snapshotRef = table.refs().get(tagName);
if (snapshotRef != null || !ifExists) {
LOG.info("Dropping tag {} on iceberg table {}", tagName, table.name());
table.manageSnapshots().removeTag(tagName).commit();
}
}

public static void replaceTag(Table table, AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceTagRefSpec) {
String sourceTag = replaceTagRefSpec.getSourceRefName();
long targetSnapshot = replaceTagRefSpec.getTargetSnapshot();
LOG.info("Replacing tag {} with snapshot {} on iceberg table {}", sourceTag, targetSnapshot, table.name());
ManageSnapshots manageSnapshots = table.manageSnapshots().replaceTag(sourceTag, targetSnapshot);
setOptionalReplaceParams(replaceTagRefSpec, manageSnapshots, sourceTag);
manageSnapshots.commit();
}

static void setOptionalReplaceParams(AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceSnapshotrefSpec,
ManageSnapshots manageSnapshots, String sourceBranch) {
if (replaceSnapshotrefSpec.getMaxRefAgeMs() > 0) {
manageSnapshots.setMaxRefAgeMs(sourceBranch, replaceSnapshotrefSpec.getMaxRefAgeMs());
}
Expand All @@ -120,6 +166,5 @@ public static void replaceBranch(Table table,
if (replaceSnapshotrefSpec.getMinSnapshotsToKeep() > 0) {
manageSnapshots.setMinSnapshotsToKeep(sourceBranch, replaceSnapshotrefSpec.getMinSnapshotsToKeep());
}
manageSnapshots.commit();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -209,4 +211,47 @@ public void testDropTag() throws InterruptedException, IOException {
Assert.assertTrue(e.getMessage().contains("Tag does not exist: test_tag_1"));
}
}

@Test
public void testReplaceTag() {
TableIdentifier identifier = TableIdentifier.of("default", "testReplaceTag");
shell.executeStatement(
String.format("CREATE EXTERNAL TABLE %s (id INT) STORED BY iceberg %s %s",
identifier.name(),
testTables.locationForCreateTableSQL(identifier),
testTables.propertiesForCreateTableSQL(ImmutableMap.of())));

shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES(1),(2),(3),(4)", identifier.name()));

org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
icebergTable.refresh();
long id1 = icebergTable.currentSnapshot().snapshotId();
// Create a branch
shell.executeStatement(String.format("ALTER TABLE %s create tag tag1", identifier.name()));
// Make one new insert to the main branch
shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES(5),(6)", identifier.name()));
icebergTable.refresh();
long id2 = icebergTable.currentSnapshot().snapshotId();

// Make another insert so that the commit isn't the last commit on the branch
shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES(7),(8)", identifier.name()));

// Validate the original count on branch before replace
List<Object[]> result =
shell.executeStatement("SELECT COUNT(*) FROM default.testReplaceTag.tag_tag1");
Assert.assertEquals(4L, result.get(0)[0]);
// Perform replace tag with snapshot id.
shell.executeStatement(
String.format("ALTER TABLE %s replace tag tag1 as of system_version %s", identifier.name(), id2));
result = shell.executeStatement("SELECT COUNT(*) FROM default.testReplaceTag.tag_tag1");
Assert.assertEquals(6L, result.get(0)[0]);

// Perform replace tag with retain
shell.executeStatement(
String.format("ALTER TABLE %s replace tag tag1 as of system_version %s retain 2 days", identifier.name(), id1));
result = shell.executeStatement("SELECT COUNT(*) FROM default.testReplaceTag.tag_tag1");
Assert.assertEquals(4L, result.get(0)[0]);
icebergTable.refresh();
Assert.assertEquals(TimeUnit.DAYS.toMillis(2), icebergTable.refs().get("tag1").maxRefAgeMs().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
4
44
PREHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch2
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
PREHOOK: Input: default@ice01
POSTHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch2
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
POSTHOOK: Input: default@ice01
STAGE DEPENDENCIES:
Stage-0 is a root stage
Expand All @@ -102,13 +102,13 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.ice01
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_BRANCH, operationParams=ReplaceSnapshotrefSpec{sourceBranch=branch1, targetBranch=branch2}}
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_SNAPSHOTREF, operationParams=ReplaceSnapshotrefSpec{sourceRef=branch1, replace=Branch, targetBranch=branch2}}

PREHOOK: query: alter table ice01 replace branch branch1 as of branch branch2
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 replace branch branch1 as of branch branch2
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
POSTHOOK: Input: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
Expand Down Expand Up @@ -138,10 +138,10 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
PREHOOK: Input: default@ice01
POSTHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
POSTHOOK: Input: default@ice01
STAGE DEPENDENCIES:
Stage-0 is a root stage
Expand All @@ -150,13 +150,13 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.ice01
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_BRANCH, operationParams=ReplaceSnapshotrefSpec{sourceBranch=branch1, targetBranch=branch3, maxRefAgeMs=432000000}}
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_SNAPSHOTREF, operationParams=ReplaceSnapshotrefSpec{sourceRef=branch1, replace=Branch, targetBranch=branch3, maxRefAgeMs=432000000}}

PREHOOK: query: alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
POSTHOOK: Input: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
Expand Down Expand Up @@ -187,10 +187,10 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
PREHOOK: Input: default@ice01
POSTHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
POSTHOOK: Input: default@ice01
STAGE DEPENDENCIES:
Stage-0 is a root stage
Expand All @@ -199,13 +199,13 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.ice01
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_BRANCH, operationParams=ReplaceSnapshotrefSpec{sourceBranch=branch1, targetBranch=branch4, minSnapshotsToKeep=5, maxSnapshotAgeMs=518400000}}
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_SNAPSHOTREF, operationParams=ReplaceSnapshotrefSpec{sourceRef=branch1, replace=Branch, targetBranch=branch4, minSnapshotsToKeep=5, maxSnapshotAgeMs=518400000}}

PREHOOK: query: alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: type: ALTERTABLE_REPLACESNAPSHOTREF
POSTHOOK: Input: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public FileStatus getStat() {
return lazyStat();
}

public void setStat(FileStatus stat) {
this.stat = stat;
}

public Path getPath() {
return path;
}
Expand Down
Loading