Skip to content

Commit b017a78

Browse files
committed
HIVE-27234: Iceberg: CREATE BRANCH SQL implementation
1 parent 36bd69e commit b017a78

File tree

17 files changed

+473
-2
lines changed

17 files changed

+473
-2
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
120120
static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of(
121121
AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN,
122122
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
123-
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE);
123+
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE, AlterTableType.CREATEBRANCH);
124124
private static final List<String> MIGRATION_ALLOWED_SOURCE_FORMATS = ImmutableList.of(
125125
FileFormat.PARQUET.name().toLowerCase(),
126126
FileFormat.ORC.name().toLowerCase(),

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
7272
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
7373
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
74+
import org.apache.hadoop.hive.ql.parse.AlterTableCreateBranchSpec;
7475
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
7576
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
7677
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -100,6 +101,7 @@
100101
import org.apache.iceberg.BaseMetastoreTableOperations;
101102
import org.apache.iceberg.BaseTable;
102103
import org.apache.iceberg.FileFormat;
104+
import org.apache.iceberg.ManageSnapshots;
103105
import org.apache.iceberg.ManifestFile;
104106
import org.apache.iceberg.NullOrder;
105107
import org.apache.iceberg.PartitionSpec;
@@ -676,6 +678,32 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
676678
}
677679
}
678680

681+
@Override
682+
public void createBranchOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
683+
AlterTableCreateBranchSpec createBranchSpec) {
684+
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
685+
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
686+
687+
String branchName = createBranchSpec.getBranchName();
688+
ManageSnapshots manageSnapshots = icebergTable.manageSnapshots();
689+
Long snapShotId = Optional.ofNullable(createBranchSpec.getSnapshotId())
690+
.orElse(icebergTable.currentSnapshot().snapshotId());
691+
LOG.info("Creating branch {} on iceberg table {}.{}", branchName, hmsTable.getDbName(),
692+
hmsTable.getTableName());
693+
manageSnapshots.createBranch(branchName, snapShotId);
694+
if (createBranchSpec.getMaxRefAgeMs() != null) {
695+
manageSnapshots.setMaxRefAgeMs(branchName, createBranchSpec.getMaxRefAgeMs());
696+
}
697+
if (createBranchSpec.getMinSnapshotsToKeep() != null) {
698+
manageSnapshots.setMinSnapshotsToKeep(branchName, createBranchSpec.getMinSnapshotsToKeep());
699+
}
700+
if (createBranchSpec.getMaxSnapshotAgeMs() != null) {
701+
manageSnapshots.setMaxSnapshotAgeMs(branchName, createBranchSpec.getMaxSnapshotAgeMs());
702+
}
703+
704+
manageSnapshots.commit();
705+
}
706+
679707
@Override
680708
public boolean isValidMetadataTable(String metaTableName) {
681709
return IcebergMetadataTables.isValidMetaTable(metaTableName);
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive;
21+
22+
import java.io.IOException;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.iceberg.SnapshotRef;
25+
import org.apache.iceberg.Table;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
public class TestHiveIcebergBranchOperation extends HiveIcebergStorageHandlerWithEngineBase {
30+
31+
@Test
32+
public void testCreateBranchWithDefaultConfig() throws InterruptedException, IOException {
33+
Table table =
34+
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
35+
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
36+
37+
String branchName = "test_branch_1";
38+
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s", branchName));
39+
table.refresh();
40+
SnapshotRef ref = table.refs().get(branchName);
41+
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
42+
Assert.assertNull(ref.minSnapshotsToKeep());
43+
Assert.assertNull(ref.maxSnapshotAgeMs());
44+
Assert.assertNull(ref.maxRefAgeMs());
45+
}
46+
47+
@Test
48+
public void testCreateBranchWithSnapshotId() throws InterruptedException, IOException {
49+
Table table =
50+
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
51+
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
52+
53+
String branchName = "test_branch_1";
54+
Long snapshotId = table.history().get(0).snapshotId();
55+
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s AS OF VERSION %d",
56+
branchName, snapshotId));
57+
table.refresh();
58+
SnapshotRef ref = table.refs().get(branchName);
59+
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
60+
Assert.assertNull(ref.minSnapshotsToKeep());
61+
Assert.assertNull(ref.maxSnapshotAgeMs());
62+
Assert.assertNull(ref.maxRefAgeMs());
63+
}
64+
65+
@Test
66+
public void testCreateBranchWithMaxRefAge() throws InterruptedException, IOException {
67+
Table table =
68+
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
69+
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
70+
71+
String branchName = "test_branch_1";
72+
long maxRefAge = 5L;
73+
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s RETAIN %d DAYS",
74+
branchName, maxRefAge));
75+
table.refresh();
76+
SnapshotRef ref = table.refs().get(branchName);
77+
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
78+
Assert.assertNull(ref.minSnapshotsToKeep());
79+
Assert.assertNull(ref.maxSnapshotAgeMs());
80+
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
81+
}
82+
83+
@Test
84+
public void testCreateBranchWithMinSnapshotsToKeep() throws InterruptedException, IOException {
85+
Table table =
86+
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
87+
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
88+
89+
String branchName = "test_branch_1";
90+
Integer minSnapshotsToKeep = 2;
91+
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS",
92+
branchName, minSnapshotsToKeep));
93+
table.refresh();
94+
SnapshotRef ref = table.refs().get(branchName);
95+
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
96+
Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
97+
Assert.assertNull(ref.maxSnapshotAgeMs());
98+
Assert.assertNull(ref.maxRefAgeMs());
99+
}
100+
101+
@Test
102+
public void testCreateBranchWithMinSnapshotsToKeepAndMaxSnapshotAge() throws InterruptedException, IOException {
103+
Table table =
104+
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
105+
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
106+
107+
String branchName = "test_branch_1";
108+
Integer minSnapshotsToKeep = 2;
109+
long maxSnapshotAge = 2L;
110+
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS" +
111+
" %d DAYS", branchName, minSnapshotsToKeep, maxSnapshotAge));
112+
table.refresh();
113+
SnapshotRef ref = table.refs().get(branchName);
114+
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
115+
Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
116+
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue());
117+
Assert.assertNull(ref.maxRefAgeMs());
118+
}
119+
120+
@Test
121+
public void testCreateBranchWithAllCustomConfig() throws InterruptedException, IOException {
122+
Table table =
123+
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
124+
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
125+
126+
String branchName = "test_branch_1";
127+
Long snapshotId = table.history().get(0).snapshotId();
128+
Integer minSnapshotsToKeep = 2;
129+
long maxSnapshotAge = 2L;
130+
long maxRefAge = 5L;
131+
shell.executeStatement(String.format("ALTER TABLE customers CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH" +
132+
" SNAPSHOT RETENTION %d SNAPSHOTS %d days",
133+
branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge));
134+
table.refresh();
135+
SnapshotRef ref = table.refs().get(branchName);
136+
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
137+
Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep());
138+
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue());
139+
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
140+
}
141+
}

parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ alterTableStatementSuffix
7474
| alterStatementSuffixSetOwner
7575
| alterStatementSuffixSetPartSpec
7676
| alterStatementSuffixExecute
77+
| alterStatementSuffixCreateBranch
7778
| alterStatementSuffixConvert
7879
;
7980

@@ -477,6 +478,34 @@ alterStatementSuffixExecute
477478
-> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
478479
;
479480

481+
alterStatementSuffixCreateBranch
482+
@init { gParent.pushMsg("alter table create branch", state); }
483+
@after { gParent.popMsg(state); }
484+
: KW_CREATE KW_BRANCH branchName=identifier snapshotIdOfBranch? branchRetain? retentionOfSnapshots?
485+
-> ^(TOK_ALTERTABLE_CREATE_BRANCH $branchName snapshotIdOfBranch? branchRetain? retentionOfSnapshots?)
486+
;
487+
488+
snapshotIdOfBranch
489+
@init { gParent.pushMsg("alter table create branch as of version", state); }
490+
@after { gParent.popMsg(state); }
491+
: KW_AS KW_OF KW_VERSION snapshotId=Number
492+
-> ^(TOK_AS_OF_VERSION_BRANCH $snapshotId)
493+
;
494+
495+
branchRetain
496+
@init { gParent.pushMsg("alter table create branch RETAIN", state); }
497+
@after { gParent.popMsg(state); }
498+
: KW_RETAIN maxRefAge=Number timeUnit=timeUnitQualifiers
499+
-> ^(TOK_RETAIN $maxRefAge $timeUnit)
500+
;
501+
502+
retentionOfSnapshots
503+
@init { gParent.pushMsg("alter table create branch WITH SNAPSHOT RETENTION", state); }
504+
@after { gParent.popMsg(state); }
505+
: (KW_WITH KW_SNAPSHOT KW_RETENTION minSnapshotsToKeep=Number KW_SNAPSHOTS (maxSnapshotAge=Number timeUnit=timeUnitQualifiers)?)
506+
-> ^(TOK_WITH_SNAPSHOT_RETENTION $minSnapshotsToKeep ($maxSnapshotAge $timeUnit)?)
507+
;
508+
480509
fileFormat
481510
@init { gParent.pushMsg("file format specification", state); }
482511
@after { gParent.popMsg(state); }

parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,11 @@ KW_SYSTEM_TIME: 'SYSTEM_TIME';
392392
KW_SYSTEM_VERSION: 'SYSTEM_VERSION';
393393
KW_EXPIRE_SNAPSHOTS: 'EXPIRE_SNAPSHOTS';
394394
KW_SET_CURRENT_SNAPSHOT: 'SET_CURRENT_SNAPSHOT';
395-
395+
KW_BRANCH: 'BRANCH';
396+
KW_VERSION: 'VERSION';
397+
KW_SNAPSHOTS: 'SNAPSHOTS';
398+
KW_RETAIN: 'RETAIN';
399+
KW_RETENTION: 'RETENTION';
396400

397401
// Operators
398402
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.

parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ TOK_ALTERTABLE_UPDATECOLUMNS;
219219
TOK_ALTERTABLE_OWNER;
220220
TOK_ALTERTABLE_SETPARTSPEC;
221221
TOK_ALTERTABLE_EXECUTE;
222+
TOK_ALTERTABLE_CREATE_BRANCH;
223+
TOK_AS_OF_VERSION_BRANCH;
224+
TOK_RETAIN;
225+
TOK_WITH_SNAPSHOT_RETENTION;
222226
TOK_ALTERTABLE_CONVERT;
223227
TOK_MSCK;
224228
TOK_SHOWDATABASES;

parser/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,13 @@ timeQualifiers
408408
| KW_SECOND -> Identifier["second"]
409409
;
410410

411+
timeUnitQualifiers
412+
:
413+
KW_DAY -> Identifier["days"]
414+
| KW_HOUR -> Identifier["hours"]
415+
| KW_MINUTE -> Identifier["minutes"]
416+
;
417+
411418
constant
412419
@init { gParent.pushMsg("constant", state); }
413420
@after { gParent.popMsg(state); }
@@ -977,6 +984,7 @@ nonReserved
977984
| KW_SYSTEM_TIME | KW_SYSTEM_VERSION
978985
| KW_EXPIRE_SNAPSHOTS
979986
| KW_SET_CURRENT_SNAPSHOT
987+
| KW_BRANCH | KW_VERSION | KW_SNAPSHOTS | KW_RETAIN | KW_RETENTION
980988
;
981989

982990
//The following SQL2011 reserved keywords are used as function name only, but not as identifiers.

ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AlterTableType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public enum AlterTableType {
4040
ALTERPARTITION("alter partition"), // Note: this is never used in AlterTableDesc.
4141
SETPARTITIONSPEC("set partition spec"),
4242
EXECUTE("execute"),
43+
CREATEBRANCH("create branch"),
4344
// constraint
4445
ADD_CONSTRAINT("add constraint"),
4546
DROP_CONSTRAINT("drop constraint"),
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql.ddl.table.branch.create;
20+
21+
import java.util.Locale;
22+
import java.util.Map;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.apache.hadoop.hive.common.TableName;
26+
import org.apache.hadoop.hive.ql.QueryState;
27+
import org.apache.hadoop.hive.ql.ddl.DDLSemanticAnalyzerFactory;
28+
import org.apache.hadoop.hive.ql.ddl.DDLWork;
29+
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer;
30+
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
31+
import org.apache.hadoop.hive.ql.exec.TaskFactory;
32+
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
33+
import org.apache.hadoop.hive.ql.metadata.Table;
34+
import org.apache.hadoop.hive.ql.parse.ASTNode;
35+
import org.apache.hadoop.hive.ql.parse.AlterTableCreateBranchSpec;
36+
import org.apache.hadoop.hive.ql.parse.HiveParser;
37+
import org.apache.hadoop.hive.ql.parse.SemanticException;
38+
39+
@DDLSemanticAnalyzerFactory.DDLType(types = HiveParser.TOK_ALTERTABLE_CREATE_BRANCH)
40+
public class AlterTableCreateBranchAnalyzer extends AbstractAlterTableAnalyzer {
41+
42+
public AlterTableCreateBranchAnalyzer(QueryState queryState) throws SemanticException {
43+
super(queryState);
44+
}
45+
46+
@Override
47+
protected void analyzeCommand(TableName tableName, Map<String, String> partitionSpec, ASTNode command)
48+
throws SemanticException {
49+
Table table = getTable(tableName);
50+
validateAlterTableType(table, AlterTableType.CREATEBRANCH, false);
51+
if (!"ICEBERG".equalsIgnoreCase(table.getParameters().get("table_type"))) {
52+
throw new SemanticException("Cannot perform ALTER CREATE BRANCH statement on non-iceberg table.");
53+
}
54+
inputs.add(new ReadEntity(table));
55+
56+
String branchName = command.getChild(0).getText();
57+
Long snapshotId = null;
58+
Long maxRefAgeMs = null;
59+
Integer minSnapshotsToKeep = null;
60+
Long maxSnapshotAgeMs = null;
61+
for (int i = 1; i < command.getChildCount(); i++) {
62+
ASTNode childNode = (ASTNode) command.getChild(i);
63+
switch (childNode.getToken().getType()) {
64+
case HiveParser.TOK_AS_OF_VERSION_BRANCH:
65+
snapshotId = Long.valueOf(childNode.getChild(0).getText());
66+
break;
67+
case HiveParser.TOK_RETAIN:
68+
String maxRefAge = childNode.getChild(0).getText();
69+
String timeUnitOfBranchRetain = childNode.getChild(1).getText();
70+
maxRefAgeMs = TimeUnit.valueOf(timeUnitOfBranchRetain.toUpperCase(Locale.ENGLISH)).toMillis(Long.valueOf(maxRefAge));
71+
break;
72+
case HiveParser.TOK_WITH_SNAPSHOT_RETENTION:
73+
minSnapshotsToKeep = Integer.valueOf(childNode.getChild(0).getText());
74+
if (childNode.getChildren().size() > 1) {
75+
String maxSnapshotAge = childNode.getChild(1).getText();
76+
String timeUnitOfSnapshotsRetention = childNode.getChild(2).getText();
77+
maxSnapshotAgeMs = TimeUnit.valueOf(timeUnitOfSnapshotsRetention.toUpperCase(Locale.ENGLISH)).toMillis(Long.valueOf(maxSnapshotAge));
78+
}
79+
break;
80+
default:
81+
throw new SemanticException("Unrecognized token in ALTER CREATE BRANCH statement");
82+
}
83+
}
84+
85+
AlterTableCreateBranchSpec spec = new AlterTableCreateBranchSpec(branchName, snapshotId, maxRefAgeMs, minSnapshotsToKeep, maxSnapshotAgeMs);
86+
AlterTableCreateBranchDesc desc = new AlterTableCreateBranchDesc(tableName, spec);
87+
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
88+
}
89+
}

0 commit comments

Comments
 (0)