Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public enum ErrorMsg {
DATACONNECTOR_NOT_EXISTS(10428, "Dataconnector does not exist:"),
TIME_TRAVEL_NOT_ALLOWED(10429, "Time travel is not allowed for {0}. Please choose a storage format which supports the feature.", true),
INVALID_METADATA_TABLE_NAME(10430, "Invalid metadata table name {0}.", true),
METADATA_TABLE_NOT_SUPPORTED(10431, "Metadata tables are not supported for table {0}.", true),
TABLE_META_REF_NOT_SUPPORTED(10431, "Table Meta Ref extension is not supported for table {0}.", true),
COMPACTION_REFUSED(10432, "Compaction request for {0}.{1}{2} is refused, details: {3}.", true),
CBO_IS_REQUIRED(10433,
"The following functionality requires CBO (" + HiveConf.ConfVars.HIVE_CBO_ENABLED.varname + "): {0}", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class Catalogs {

public static final String NAME = "name";
public static final String LOCATION = "location";
public static final String BRANCH_NAME = "branch_name";

private static final String NO_CATALOG_TYPE = "no catalog";
private static final Set<String> PROPERTIES_TO_REMOVE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private InputFormatConfig() {
public static final boolean CONFIG_SERIALIZATION_DISABLED_DEFAULT = true;
public static final String OPERATION_TYPE_PREFIX = "iceberg.mr.operation.type.";
public static final String OUTPUT_TABLES = "iceberg.mr.output.tables";
public static final String OUTPUT_TABLE_BRANCH = "iceberg.mr.output.table.branch";
public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size";
public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
job.set(InputFormatConfig.SNAPSHOT_ID, job.get(TableScanDesc.AS_OF_VERSION, "-1"));
job.set(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, job.get(TableScanDesc.FROM_VERSION, "-1"));
job.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, job.get(TableScanDesc.BRANCH_NAME, ""));

String location = job.get(InputFormatConfig.TABLE_LOCATION);
return Arrays.stream(super.getSplits(job, numSplits))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -431,16 +433,17 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output

FilesForCommit writeResults = collectResults(
numTasks, executor, outputTable.table.location(), jobContext, io, true);
String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
if (writeResults.isEmpty()) {
LOG.info(
"Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add",
table, jobContext.getJobID(), HiveCustomStorageHandlerUtils.getWriteOperation(conf, name));
} else {
commitWrite(table, startTime, writeResults);
commitWrite(table, branchName, startTime, writeResults);
}
} else {
commitOverwrite(table, startTime, writeResults);
commitOverwrite(table, branchName, startTime, writeResults);
}
}

Expand All @@ -451,15 +454,21 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
* @param startTime The start time of the commit - used only for logging
* @param results The object containing the new files we would like to add to the table
*/
private void commitWrite(Table table, long startTime, FilesForCommit results) {
private void commitWrite(Table table, String branchName, long startTime, FilesForCommit results) {
if (results.deleteFiles().isEmpty()) {
AppendFiles write = table.newAppend();
results.dataFiles().forEach(write::appendFile);
if (StringUtils.isNotEmpty(branchName)) {
write.toBranch(HiveUtils.getTableBranch(branchName));
}
write.commit();
} else {
RowDelta write = table.newRowDelta();
results.dataFiles().forEach(write::addRows);
results.deleteFiles().forEach(write::addDeletes);
if (StringUtils.isNotEmpty(branchName)) {
write.toBranch(HiveUtils.getTableBranch(branchName));
}
write.commit();
}

Expand All @@ -478,17 +487,23 @@ private void commitWrite(Table table, long startTime, FilesForCommit results) {
* @param startTime The start time of the commit - used only for logging
* @param results The object containing the new files
*/
private void commitOverwrite(Table table, long startTime, FilesForCommit results) {
private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results) {
Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
if (!results.dataFiles().isEmpty()) {
ReplacePartitions overwrite = table.newReplacePartitions();
results.dataFiles().forEach(overwrite::addFile);
if (StringUtils.isNotEmpty(branchName)) {
overwrite.toBranch(HiveUtils.getTableBranch(branchName));
}
overwrite.commit();
LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime,
table, results.dataFiles().size());
} else if (table.spec().isUnpartitioned()) {
DeleteFiles deleteFiles = table.newDelete();
deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
if (StringUtils.isNotEmpty(branchName)) {
deleteFiles.toBranch(HiveUtils.getTableBranch(branchName));
}
deleteFiles.commit();
LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. " +
"Commit took {} ms for table: {}", System.currentTimeMillis() - startTime, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
Expand All @@ -75,6 +76,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
Expand Down Expand Up @@ -633,11 +635,12 @@ public boolean commitInMoveTask() {
public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
String location = commitProperties.getProperty(Catalogs.LOCATION);
String branchName = commitProperties.getProperty(Catalogs.BRANCH_NAME);
Configuration configuration = SessionState.getSessionConf();
if (location != null) {
HiveTableUtil.cleanupTableObjectFile(location, configuration);
}
List<JobContext> jobContextList = generateJobContext(configuration, tableName, overwrite);
List<JobContext> jobContextList = generateJobContext(configuration, tableName, branchName, overwrite);
if (jobContextList.isEmpty()) {
return;
}
Expand Down Expand Up @@ -678,7 +681,7 @@ public boolean isTimeTravelAllowed() {
}

@Override
public boolean isMetadataTableSupported() {
public boolean isTableMetaRefSupported() {
return true;
}

Expand Down Expand Up @@ -768,6 +771,25 @@ public boolean isValidMetadataTable(String metaTableName) {
return IcebergMetadataTables.isValidMetaTable(metaTableName);
}

@Override
public org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef(
org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException {
String branch = HiveUtils.getTableBranch(tableMetaRef);
if (branch != null) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (tbl.snapshot(branch) != null) {
hmsTable.setBranchName(tableMetaRef);
return hmsTable;
}
throw new SemanticException(String.format("Cannot use branch (does not exist): %s", branch));
}
if (IcebergMetadataTables.isValidMetaTable(tableMetaRef)) {
hmsTable.setMetaTable(tableMetaRef);
return hmsTable;
}
throw new SemanticException(ErrorMsg.INVALID_METADATA_TABLE_NAME, tableMetaRef);
}

@Override
public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws URISyntaxException {
String dbName = hmsTable.getDbName();
Expand Down Expand Up @@ -1252,7 +1274,7 @@ private static boolean hasParquetNestedTypeWithinListOrMap(Properties tableProps
* @return The generated Optional JobContext list or empty if not presents.
*/
private List<JobContext> generateJobContext(Configuration configuration, String tableName,
boolean overwrite) {
String branchName, boolean overwrite) {
JobConf jobConf = new JobConf(configuration);
Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
SessionStateUtil.getCommitInfo(jobConf, tableName);
Expand All @@ -1266,6 +1288,9 @@ private List<JobContext> generateJobContext(Configuration configuration, String
// we should only commit this current table because
// for multi-table inserts, this hook method will be called sequentially for each target table
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
if (branchName != null) {
jobConf.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, branchName);
}

jobContextList.add(new JobContextImpl(jobConf, jobID, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
Expand Down Expand Up @@ -126,6 +128,10 @@ private static TableScan createTableScan(Table table, Configuration conf) {
}
snapshotId = ref.snapshotId();
}
String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
if (StringUtils.isNotEmpty(branchName)) {
scan = scan.useRef(HiveUtils.getTableBranch(branchName));
}
if (snapshotId != -1) {
scan = scan.useSnapshot(snapshotId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create external table ice01(a int, b string, c int) stored by iceberg;

-- insert into branch test1 which does not exist
insert into default.ice01.branch_test1 values(11, 'one', 22);
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- SORT_QUERY_RESULTS
set hive.explain.user=false;
set hive.fetch.task.conversion=more;

create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2');
create table source01(a int, b string, c int);

insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55);
insert into source01 values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55);

-- create a branch named test1
alter table ice01 create branch test1;

-- query branch using table identifier: db.tbl.branch_branchName
explain select * from default.ice01.branch_test1;
select * from default.ice01.branch_test1;
-- query branch using time travel syntax
select * from ice01 for system_version as of 'test1';

-- insert into branch test1
explain insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66);
insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66);
select * from default.ice01.branch_test1;

-- delete from branch test1
explain delete from default.ice01.branch_test1 where a=22;
delete from default.ice01.branch_test1 where a=22;
select * from default.ice01.branch_test1;

-- update branch test1
explain update default.ice01.branch_test1 set a=33 where c=66;
update default.ice01.branch_test1 set a=33 where c=66;
select * from default.ice01.branch_test1;

-- merge into branch test1
explain
merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
when matched and t.a > 100 THEN DELETE
when matched then update set b = 'Merged', c = t.c + 10
when not matched then insert values (src.a, src.b, src.c);

merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
when matched and t.a > 100 THEN DELETE
when matched then update set b = 'Merged', c = t.c + 10
when not matched then insert values (src.a, src.b, src.c);

select * from default.ice01.branch_test1;

-- insert overwrite branch test1
explain insert overwrite table default.ice01.branch_test1 values (77, 'one', 88);
insert overwrite table default.ice01.branch_test1 values (77, 'one', 88);
select * from default.ice01.branch_test1;

-- query branch using non-fetch task
set hive.fetch.task.conversion=none;
explain select * from default.ice01.branch_test1;
select * from default.ice01.branch_test1;

drop table ice01;
drop table source01;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
PREHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice01
POSTHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice01
FAILED: SemanticException Cannot use branch (does not exist): test1
Loading