Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
Expand All @@ -58,7 +57,6 @@
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
Expand All @@ -84,7 +82,6 @@
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil;
import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -106,7 +103,6 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);

private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
private static final String FOR_COMMIT_EXTENSION = ".forCommit";
private static final String CONFLICT_DETECTION_FILTER = "Conflict detection Filter Expression: {}";

private ExecutorService workerPool;
Expand Down Expand Up @@ -159,7 +155,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
.run(output -> {
Table table = HiveTableUtil.deserializeTable(context.getJobConf(), output);
if (table != null) {
String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
String fileForCommitLocation = HiveTableUtil.fileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
if (writers.get(output) != null) {
List<DataFile> dataFiles = Lists.newArrayList();
Expand Down Expand Up @@ -288,7 +284,7 @@ public void commitJobs(List<JobContext> originalContextList, Operation operation
final Collection<JobContext> jobContexts = outputs.get(output);
final Table table = output.table;
jobContexts.forEach(jobContext -> jobLocations.add(
generateJobLocation(table.location(), jobConf, jobContext.getJobID()))
HiveTableUtil.jobLocation(table.location(), jobConf, jobContext.getJobID()))
);
commitTable(table.io(), fileExecutor, output, jobContexts, operation);
});
Expand Down Expand Up @@ -360,7 +356,7 @@ public void abortJobs(List<JobContext> originalContextList) throws IOException {
for (JobContext jobContext : outputs.get(output)) {
LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output);
Table table = output.table;
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
String jobLocation = HiveTableUtil.jobLocation(table.location(), jobConf, jobContext.getJobID());
jobLocations.add(jobLocation);
// list jobLocation to get number of forCommit files
// we do this because map/reduce num in jobConf is unreliable and we have no access to vertex status info
Expand Down Expand Up @@ -404,7 +400,8 @@ private static Set<FileStatus> listForCommits(JobConf jobConf, String jobLocatio
FileStatus[] children = path.getFileSystem(jobConf).listStatus(path);
LOG.debug("Listing the job location: {} yielded these files: {}", jobLocation, Arrays.toString(children));
return Arrays.stream(children)
.filter(child -> !child.isDirectory() && child.getPath().getName().endsWith(FOR_COMMIT_EXTENSION))
.filter(child -> !child.isDirectory() &&
child.getPath().getName().endsWith(HiveTableUtil.FOR_COMMIT_EXTENSION))
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -451,7 +448,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
}

LOG.info("Committing job has started for table: {}, using location: {}",
table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID()));
table, HiveTableUtil.jobLocation(outputTable.table.location(), conf, jobContext.getJobID()));

int numTasks = SessionStateUtil.getCommitInfo(conf, name)
.map(info -> info.get(jobContext.getJobID().toString()))
Expand Down Expand Up @@ -771,7 +768,7 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
.executeWith(executor)
.retry(3)
.run(taskId -> {
final String taskFileName = generateFileForCommitLocation(location, conf, jobContext.getJobID(), taskId);
String taskFileName = HiveTableUtil.fileForCommitLocation(location, conf, jobContext.getJobID(), taskId);
final FilesForCommit files = readFileForCommit(taskFileName, io);
LOG.debug("Found Iceberg commitTask manifest file: {}\n{}", taskFileName, files);

Expand All @@ -787,34 +784,6 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
mergedAndDeletedFiles);
}

/**
* Generates the job temp location based on the job configuration.
* Currently it uses TABLE_LOCATION/temp/QUERY_ID-jobId.
* @param location The location of the table
* @param conf The job's configuration
* @param jobId The JobID for the task
* @return The file to store the results
*/
@VisibleForTesting
static String generateJobLocation(String location, Configuration conf, JobID jobId) {
String queryId = conf.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname);
return location + "/temp/" + queryId + "-" + jobId;
}

/**
* Generates file location based on the task configuration and a specific task id.
* This file will be used to store the data required to generate the Iceberg commit.
* Currently it uses TABLE_LOCATION/temp/QUERY_ID-jobId/task-[0..numTasks).forCommit.
* @param location The location of the table
* @param conf The job's configuration
* @param jobId The jobId for the task
* @param taskId The taskId for the commit file
* @return The file to store the results
*/
private static String generateFileForCommitLocation(String location, Configuration conf, JobID jobId, int taskId) {
return generateJobLocation(location, conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
}

private static void createFileForCommit(FilesForCommit writeResult, String location, FileIO io) throws IOException {
OutputFile fileForCommit = io.newOutputFile(location);
try (ObjectOutputStream oos = new ObjectOutputStream(fileForCommit.createOrOverwrite())) {
Expand Down Expand Up @@ -855,7 +824,7 @@ public static List<FileStatus> getOutputFiles(List<JobContext> jobContexts) thro
for (JobContext jobContext : outputs.get(output)) {
Table table = output.table;
FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf);
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
String jobLocation = HiveTableUtil.jobLocation(table.location(), jobConf, jobContext.getJobID());
// list jobLocation to get number of forCommit files
// we do this because map/reduce num in jobConf is unreliable
// and we have no access to vertex status info
Expand Down Expand Up @@ -903,7 +872,7 @@ public static List<ContentFile<?>> getOutputContentFiles(List<JobContext> jobCon
.run(output -> {
for (JobContext jobContext : outputs.get(output)) {
Table table = output.table;
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
String jobLocation = HiveTableUtil.jobLocation(table.location(), jobConf, jobContext.getJobID());
// list jobLocation to get number of forCommit files
// we do this because map/reduce num in jobConf is unreliable
// and we have no access to vertex status info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class HiveTableUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableUtil.class);

static final String TABLE_EXTENSION = ".table";
static final String FOR_COMMIT_EXTENSION = ".forCommit";

private HiveTableUtil() {
}
Expand Down Expand Up @@ -294,12 +296,44 @@ public Configuration get() {
}
}

private static String generateTableObjectLocation(String tableLocation, Configuration conf) {
return tableLocation + "/temp/" + conf.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname) + TABLE_EXTENSION;
/**
* Generates the file location for the serialized table object.
* @param location The location of the table.
* @param conf The configuration containing the query ID.
* @return The file path for the serialized table object.
*/
private static String tableObjectLocation(String location, Configuration conf) {
String queryId = conf.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname);
return location + "/temp/" + queryId + TABLE_EXTENSION;
}

/**
* Generates the job temp location based on the job configuration.
* @param location The location of the table.
* @param conf The job's configuration.
* @param jobId The JobID for the task.
* @return The directory path for the job's temporary location.
*/
public static String jobLocation(String location, Configuration conf, JobID jobId) {
String queryId = conf.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname);
return location + "/temp/" + queryId + "-" + jobId;
}

/**
* Generates file location based on the task configuration and a specific task id.
* This file will be used to store the data required to generate the Iceberg commit.
* @param location The location of the table.
* @param conf The job's configuration.
* @param jobId The jobId for the task.
* @param taskId The taskId for the commit file.
* @return The file path for storing the commit data.
*/
static String fileForCommitLocation(String location, Configuration conf, JobID jobId, int taskId) {
return jobLocation(location, conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
}

static void createFileForTableObject(Table table, Configuration conf) {
String filePath = generateTableObjectLocation(table.location(), conf);
String filePath = tableObjectLocation(table.location(), conf);
String bytes = serializeTable(table, conf, null, null);
OutputFile serializedTableFile = table.io().newOutputFile(filePath);
try (ObjectOutputStream oos = new ObjectOutputStream(serializedTableFile.createOrOverwrite())) {
Expand All @@ -311,7 +345,7 @@ static void createFileForTableObject(Table table, Configuration conf) {
}

static void cleanupTableObjectFile(String location, Configuration configuration) {
String filePath = generateTableObjectLocation(location, configuration);
String filePath = tableObjectLocation(location, configuration);
Path toDelete = new Path(filePath);
try {
FileSystem fs = Util.getFs(toDelete, configuration);
Expand All @@ -322,7 +356,7 @@ static void cleanupTableObjectFile(String location, Configuration configuration)
}

private static Table readTableObjectFromFile(String location, Configuration config) {
String filePath = generateTableObjectLocation(location, config);
String filePath = tableObjectLocation(location, config);
try (FileIO io = new HadoopFileIO(config)) {
try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(filePath).newStream())) {
return SerializationUtil.deserializeFromBase64((String) ois.readObject());
Expand Down

This file was deleted.

Loading