Skip to content

Commit e0bf960

Browse files
authored
HIVE-27302: Iceberg: Support write to iceberg branch (#4292). (Butao Zhang, reviewed by Denys Kuzmenko, Ayush Saxena)
1 parent 8899600 commit e0bf960

File tree

31 files changed

+1172
-75
lines changed

31 files changed

+1172
-75
lines changed

common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ public enum ErrorMsg {
478478
DATACONNECTOR_NOT_EXISTS(10428, "Dataconnector does not exist:"),
479479
TIME_TRAVEL_NOT_ALLOWED(10429, "Time travel is not allowed for {0}. Please choose a storage format which supports the feature.", true),
480480
INVALID_METADATA_TABLE_NAME(10430, "Invalid metadata table name {0}.", true),
481-
METADATA_TABLE_NOT_SUPPORTED(10431, "Metadata tables are not supported for table {0}.", true),
481+
TABLE_META_REF_NOT_SUPPORTED(10431, "Table Meta Ref extension is not supported for table {0}.", true),
482482
COMPACTION_REFUSED(10432, "Compaction request for {0}.{1}{2} is refused, details: {3}.", true),
483483
CBO_IS_REQUIRED(10433,
484484
"The following functionality requires CBO (" + HiveConf.ConfVars.HIVE_CBO_ENABLED.varname + "): {0}", true),

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class Catalogs {
6969

7070
public static final String NAME = "name";
7171
public static final String LOCATION = "location";
72+
public static final String BRANCH_NAME = "branch_name";
7273

7374
private static final String NO_CATALOG_TYPE = "no catalog";
7475
private static final Set<String> PROPERTIES_TO_REMOVE =

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ private InputFormatConfig() {
8585
public static final boolean CONFIG_SERIALIZATION_DISABLED_DEFAULT = true;
8686
public static final String OPERATION_TYPE_PREFIX = "iceberg.mr.operation.type.";
8787
public static final String OUTPUT_TABLES = "iceberg.mr.output.tables";
88+
public static final String OUTPUT_TABLE_BRANCH = "iceberg.mr.output.table.branch";
8889
public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size";
8990
public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
9091
public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
143143
job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
144144
job.set(InputFormatConfig.SNAPSHOT_ID, job.get(TableScanDesc.AS_OF_VERSION, "-1"));
145145
job.set(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, job.get(TableScanDesc.FROM_VERSION, "-1"));
146+
job.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, job.get(TableScanDesc.BRANCH_NAME, ""));
146147

147148
String location = job.get(InputFormatConfig.TABLE_LOCATION);
148149
return Arrays.stream(super.getSplits(job, numSplits))

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636
import java.util.concurrent.Executors;
3737
import java.util.stream.Collectors;
3838
import java.util.stream.Stream;
39+
import org.apache.commons.lang3.StringUtils;
3940
import org.apache.hadoop.conf.Configuration;
4041
import org.apache.hadoop.fs.FileStatus;
4142
import org.apache.hadoop.fs.FileSystem;
4243
import org.apache.hadoop.fs.Path;
4344
import org.apache.hadoop.hive.conf.HiveConf;
45+
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
4446
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
4547
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
4648
import org.apache.hadoop.mapred.JobConf;
@@ -431,16 +433,17 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
431433

432434
FilesForCommit writeResults = collectResults(
433435
numTasks, executor, outputTable.table.location(), jobContext, io, true);
436+
String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
434437
if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
435438
if (writeResults.isEmpty()) {
436439
LOG.info(
437440
"Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add",
438441
table, jobContext.getJobID(), HiveCustomStorageHandlerUtils.getWriteOperation(conf, name));
439442
} else {
440-
commitWrite(table, startTime, writeResults);
443+
commitWrite(table, branchName, startTime, writeResults);
441444
}
442445
} else {
443-
commitOverwrite(table, startTime, writeResults);
446+
commitOverwrite(table, branchName, startTime, writeResults);
444447
}
445448
}
446449

@@ -451,15 +454,21 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
451454
* @param startTime The start time of the commit - used only for logging
452455
* @param results The object containing the new files we would like to add to the table
453456
*/
454-
private void commitWrite(Table table, long startTime, FilesForCommit results) {
457+
private void commitWrite(Table table, String branchName, long startTime, FilesForCommit results) {
455458
if (results.deleteFiles().isEmpty()) {
456459
AppendFiles write = table.newAppend();
457460
results.dataFiles().forEach(write::appendFile);
461+
if (StringUtils.isNotEmpty(branchName)) {
462+
write.toBranch(HiveUtils.getTableBranch(branchName));
463+
}
458464
write.commit();
459465
} else {
460466
RowDelta write = table.newRowDelta();
461467
results.dataFiles().forEach(write::addRows);
462468
results.deleteFiles().forEach(write::addDeletes);
469+
if (StringUtils.isNotEmpty(branchName)) {
470+
write.toBranch(HiveUtils.getTableBranch(branchName));
471+
}
463472
write.commit();
464473
}
465474

@@ -478,17 +487,23 @@ private void commitWrite(Table table, long startTime, FilesForCommit results) {
478487
* @param startTime The start time of the commit - used only for logging
479488
* @param results The object containing the new files
480489
*/
481-
private void commitOverwrite(Table table, long startTime, FilesForCommit results) {
490+
private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results) {
482491
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
483492
if (!results.dataFiles().isEmpty()) {
484493
ReplacePartitions overwrite = table.newReplacePartitions();
485494
results.dataFiles().forEach(overwrite::addFile);
495+
if (StringUtils.isNotEmpty(branchName)) {
496+
overwrite.toBranch(HiveUtils.getTableBranch(branchName));
497+
}
486498
overwrite.commit();
487499
LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime,
488500
table, results.dataFiles().size());
489501
} else if (table.spec().isUnpartitioned()) {
490502
DeleteFiles deleteFiles = table.newDelete();
491503
deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
504+
if (StringUtils.isNotEmpty(branchName)) {
505+
deleteFiles.toBranch(HiveUtils.getTableBranch(branchName));
506+
}
492507
deleteFiles.commit();
493508
LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. " +
494509
"Commit took {} ms for table: {}", System.currentTimeMillis() - startTime, table);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
6060
import org.apache.hadoop.hive.ql.Context;
6161
import org.apache.hadoop.hive.ql.Context.Operation;
62+
import org.apache.hadoop.hive.ql.ErrorMsg;
6263
import org.apache.hadoop.hive.ql.QueryState;
6364
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
6465
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
@@ -75,6 +76,7 @@
7576
import org.apache.hadoop.hive.ql.metadata.HiveException;
7677
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
7778
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
79+
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
7880
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
7981
import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
8082
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
@@ -633,11 +635,12 @@ public boolean commitInMoveTask() {
633635
public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException {
634636
String tableName = commitProperties.getProperty(Catalogs.NAME);
635637
String location = commitProperties.getProperty(Catalogs.LOCATION);
638+
String branchName = commitProperties.getProperty(Catalogs.BRANCH_NAME);
636639
Configuration configuration = SessionState.getSessionConf();
637640
if (location != null) {
638641
HiveTableUtil.cleanupTableObjectFile(location, configuration);
639642
}
640-
List<JobContext> jobContextList = generateJobContext(configuration, tableName, overwrite);
643+
List<JobContext> jobContextList = generateJobContext(configuration, tableName, branchName, overwrite);
641644
if (jobContextList.isEmpty()) {
642645
return;
643646
}
@@ -678,7 +681,7 @@ public boolean isTimeTravelAllowed() {
678681
}
679682

680683
@Override
681-
public boolean isMetadataTableSupported() {
684+
public boolean isTableMetaRefSupported() {
682685
return true;
683686
}
684687

@@ -768,6 +771,25 @@ public boolean isValidMetadataTable(String metaTableName) {
768771
return IcebergMetadataTables.isValidMetaTable(metaTableName);
769772
}
770773

774+
@Override
775+
public org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef(
776+
org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException {
777+
String branch = HiveUtils.getTableBranch(tableMetaRef);
778+
if (branch != null) {
779+
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
780+
if (tbl.snapshot(branch) != null) {
781+
hmsTable.setBranchName(tableMetaRef);
782+
return hmsTable;
783+
}
784+
throw new SemanticException(String.format("Cannot use branch (does not exist): %s", branch));
785+
}
786+
if (IcebergMetadataTables.isValidMetaTable(tableMetaRef)) {
787+
hmsTable.setMetaTable(tableMetaRef);
788+
return hmsTable;
789+
}
790+
throw new SemanticException(ErrorMsg.INVALID_METADATA_TABLE_NAME, tableMetaRef);
791+
}
792+
771793
@Override
772794
public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws URISyntaxException {
773795
String dbName = hmsTable.getDbName();
@@ -1252,7 +1274,7 @@ private static boolean hasParquetNestedTypeWithinListOrMap(Properties tableProps
12521274
* @return The generated Optional JobContext list or empty if not presents.
12531275
*/
12541276
private List<JobContext> generateJobContext(Configuration configuration, String tableName,
1255-
boolean overwrite) {
1277+
String branchName, boolean overwrite) {
12561278
JobConf jobConf = new JobConf(configuration);
12571279
Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
12581280
SessionStateUtil.getCommitInfo(jobConf, tableName);
@@ -1266,6 +1288,9 @@ private List<JobContext> generateJobContext(Configuration configuration, String
12661288
// we should only commit this current table because
12671289
// for multi-table inserts, this hook method will be called sequentially for each target table
12681290
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
1291+
if (branchName != null) {
1292+
jobConf.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, branchName);
1293+
}
12691294

12701295
jobContextList.add(new JobContextImpl(jobConf, jobID, null));
12711296
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import java.util.function.BiFunction;
3131
import java.util.stream.Collectors;
3232
import java.util.stream.Stream;
33+
import org.apache.commons.lang3.StringUtils;
3334
import org.apache.hadoop.conf.Configuration;
3435
import org.apache.hadoop.fs.Path;
3536
import org.apache.hadoop.hive.llap.LlapHiveUtils;
3637
import org.apache.hadoop.hive.ql.exec.Utilities;
38+
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
3739
import org.apache.hadoop.hive.ql.plan.MapWork;
3840
import org.apache.hadoop.mapred.JobConf;
3941
import org.apache.hadoop.mapreduce.InputFormat;
@@ -126,6 +128,10 @@ private static TableScan createTableScan(Table table, Configuration conf) {
126128
}
127129
snapshotId = ref.snapshotId();
128130
}
131+
String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
132+
if (StringUtils.isNotEmpty(branchName)) {
133+
scan = scan.useRef(HiveUtils.getTableBranch(branchName));
134+
}
129135
if (snapshotId != -1) {
130136
scan = scan.useSnapshot(snapshotId);
131137
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
create external table ice01(a int, b string, c int) stored by iceberg;
2+
3+
-- insert into branch test1 which does not exist
4+
insert into default.ice01.branch_test1 values(11, 'one', 22);
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
-- SORT_QUERY_RESULTS
2+
set hive.explain.user=false;
3+
set hive.fetch.task.conversion=more;
4+
5+
create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2');
6+
create table source01(a int, b string, c int);
7+
8+
insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55);
9+
insert into source01 values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55);
10+
11+
-- create a branch named test1
12+
alter table ice01 create branch test1;
13+
14+
-- query branch using table identifier: db.tbl.branch_branchName
15+
explain select * from default.ice01.branch_test1;
16+
select * from default.ice01.branch_test1;
17+
-- query branch using time travel syntax
18+
select * from ice01 for system_version as of 'test1';
19+
20+
-- insert into branch test1
21+
explain insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66);
22+
insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66);
23+
select * from default.ice01.branch_test1;
24+
25+
-- delete from branch test1
26+
explain delete from default.ice01.branch_test1 where a=22;
27+
delete from default.ice01.branch_test1 where a=22;
28+
select * from default.ice01.branch_test1;
29+
30+
-- update branch test1
31+
explain update default.ice01.branch_test1 set a=33 where c=66;
32+
update default.ice01.branch_test1 set a=33 where c=66;
33+
select * from default.ice01.branch_test1;
34+
35+
-- merge into branch test1
36+
explain
37+
merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
38+
when matched and t.a > 100 THEN DELETE
39+
when matched then update set b = 'Merged', c = t.c + 10
40+
when not matched then insert values (src.a, src.b, src.c);
41+
42+
merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
43+
when matched and t.a > 100 THEN DELETE
44+
when matched then update set b = 'Merged', c = t.c + 10
45+
when not matched then insert values (src.a, src.b, src.c);
46+
47+
select * from default.ice01.branch_test1;
48+
49+
-- insert overwrite branch test1
50+
explain insert overwrite table default.ice01.branch_test1 values (77, 'one', 88);
51+
insert overwrite table default.ice01.branch_test1 values (77, 'one', 88);
52+
select * from default.ice01.branch_test1;
53+
54+
-- query branch using non-fetch task
55+
set hive.fetch.task.conversion=none;
56+
explain select * from default.ice01.branch_test1;
57+
select * from default.ice01.branch_test1;
58+
59+
drop table ice01;
60+
drop table source01;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
PREHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg
2+
PREHOOK: type: CREATETABLE
3+
PREHOOK: Output: database:default
4+
PREHOOK: Output: default@ice01
5+
POSTHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg
6+
POSTHOOK: type: CREATETABLE
7+
POSTHOOK: Output: database:default
8+
POSTHOOK: Output: default@ice01
9+
FAILED: SemanticException Cannot use branch (does not exist): test1

0 commit comments

Comments
 (0)