diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 7f1ea6175ed6e..2f1fc0ba61853 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -68,7 +68,7 @@ public class FileOutputCommitter extends PathOutputCommitter { "mapreduce.fileoutputcommitter.marksuccessfuljobs"; public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; - public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2; + public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1; // Skip cleanup _temporary folders under job's output directory public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED = "mapreduce.fileoutputcommitter.cleanup.skipped"; @@ -93,15 +93,19 @@ public class FileOutputCommitter extends PathOutputCommitter { // commitJob will recursively delete the entire job temporary directory. // HDFS has O(1) recursive delete, so this parameter is left false by default. // Users of object stores, for example, may want to set this to true. Note: - // this is only used if mapreduce.fileoutputcommitter.algorithm.version=2 + // this was only used if mapreduce.fileoutputcommitter.algorithm.version=2, + // and so is now unused + @Deprecated public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED = "mapreduce.fileoutputcommitter.task.cleanup.enabled"; + + @Deprecated public static final boolean FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; private Path outputPath = null; private Path workPath = null; - private final int algorithmVersion; + //private final int algorithmVersion; private final boolean skipCleanup; private final boolean ignoreCleanupFailures; @@ -135,13 +139,18 @@ public FileOutputCommitter(Path outputPath, JobContext context) throws IOException { super(outputPath, context); Configuration conf = context.getConfiguration(); - algorithmVersion = + int algorithmVersion = conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); LOG.info("File Output Committer Algorithm version is " + algorithmVersion); if (algorithmVersion != 1 && algorithmVersion != 2) { throw new IOException("Only 1 or 2 algorithm version is supported"); } + // Downgrade v2 to v1 with a warning. + if (algorithmVersion == 2) { + LOG.warn("The v2 algorithm has been removed to ensure the output of jobs" + + " are always correct"); + } // if skip cleanup skipCleanup = conf.getBoolean( @@ -399,10 +408,8 @@ protected void commitJobInternal(JobContext context) throws IOException { Path finalOutput = getOutputPath(); FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); - if (algorithmVersion == 1) { - for (FileStatus stat: getAllCommittedTaskPaths(context)) { - mergePaths(fs, stat, finalOutput, context); - } + for (FileStatus stat: getAllCommittedTaskPaths(context)) { + mergePaths(fs, stat, finalOutput, context); } if (skipCleanup) { @@ -503,16 +510,9 @@ private void reportProgress(JobContext context) { private void renameOrMerge(FileSystem fs, FileStatus from, Path to, JobContext context) throws IOException { - if (algorithmVersion == 1) { - if (!fs.rename(from.getPath(), to)) { - throw new IOException("Failed to rename " + from + " to " + to); - } - } else { - fs.mkdirs(to); - for (FileStatus subFrom : fs.listStatus(from.getPath())) { - Path subTo = new Path(to, subFrom.getPath().getName()); - mergePaths(fs, subFrom, subTo, context); - } + // this always does rename now that the V2 algorithm has been removed. + if (!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename " + from + " to " + to); } } @@ -588,36 +588,17 @@ public void commitTask(TaskAttemptContext context, Path taskAttemptPath) } if (taskAttemptDirStatus != null) { - if (algorithmVersion == 1) { - Path committedTaskPath = getCommittedTaskPath(context); - if (fs.exists(committedTaskPath)) { - if (!fs.delete(committedTaskPath, true)) { - throw new IOException("Could not delete " + committedTaskPath); - } - } - if (!fs.rename(taskAttemptPath, committedTaskPath)) { - throw new IOException("Could not rename " + taskAttemptPath + " to " - + committedTaskPath); - } - LOG.info("Saved output of task '" + attemptId + "' to " + - committedTaskPath); - } else { - // directly merge everything from taskAttemptPath to output directory - mergePaths(fs, taskAttemptDirStatus, outputPath, context); - LOG.info("Saved output of task '" + attemptId + "' to " + - outputPath); - - if (context.getConfiguration().getBoolean( - FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, - FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) { - LOG.debug(String.format( - "Deleting the temporary directory of '%s': '%s'", - attemptId, taskAttemptPath)); - if(!fs.delete(taskAttemptPath, true)) { - LOG.warn("Could not delete " + taskAttemptPath); - } - } + Path committedTaskPath = getCommittedTaskPath(context); + if (fs.exists(committedTaskPath) + && !fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete " + committedTaskPath); } + if (!fs.rename(taskAttemptPath, committedTaskPath)) { + throw new IOException("Could not rename " + taskAttemptPath + " to " + + committedTaskPath); + } + LOG.info("Saved output of task '" + attemptId + "' to " + + committedTaskPath); } else { LOG.warn("No Output found for " + attemptId); } @@ -682,7 +663,7 @@ public boolean isRecoverySupported() { @Override public boolean isCommitJobRepeatable(JobContext context) throws IOException { - return algorithmVersion == 2; + return false; } @Override @@ -702,35 +683,21 @@ public void recoverTask(TaskAttemptContext context) if (LOG.isDebugEnabled()) { LOG.debug("Trying to recover task from " + previousCommittedTaskPath); } - if (algorithmVersion == 1) { - if (fs.exists(previousCommittedTaskPath)) { - Path committedTaskPath = getCommittedTaskPath(context); - if (!fs.delete(committedTaskPath, true) && - fs.exists(committedTaskPath)) { - throw new IOException("Could not delete " + committedTaskPath); - } - //Rename can fail if the parent directory does not yet exist. - Path committedParent = committedTaskPath.getParent(); - fs.mkdirs(committedParent); - if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { - throw new IOException("Could not rename " + previousCommittedTaskPath + - " to " + committedTaskPath); - } - } else { - LOG.warn(attemptId+" had no output to recover."); + if (fs.exists(previousCommittedTaskPath)) { + Path committedTaskPath = getCommittedTaskPath(context); + if (!fs.delete(committedTaskPath, true) && + fs.exists(committedTaskPath)) { + throw new IOException("Could not delete " + committedTaskPath); } - } else { - // essentially a no-op, but for backwards compatibility - // after upgrade to the new fileOutputCommitter, - // check if there are any output left in committedTaskPath - try { - FileStatus from = fs.getFileStatus(previousCommittedTaskPath); - LOG.info("Recovering task for upgrading scenario, moving files from " - + previousCommittedTaskPath + " to " + outputPath); - mergePaths(fs, from, outputPath, context); - } catch (FileNotFoundException ignored) { + //Rename can fail if the parent directory does not yet exist. + Path committedParent = committedTaskPath.getParent(); + fs.mkdirs(committedParent); + if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { + throw new IOException("Could not rename " + previousCommittedTaskPath + + " to " + committedTaskPath); } - LOG.info("Done recovering task " + attemptId); + } else { + LOG.warn(attemptId + " had no output to recover."); } } else { LOG.warn("Output Path is null in recoverTask()"); @@ -744,7 +711,6 @@ public String toString() { sb.append(super.toString()).append("; "); sb.append("outputPath=").append(outputPath); sb.append(", workPath=").append(workPath); - sb.append(", algorithmVersion=").append(algorithmVersion); sb.append(", skipCleanup=").append(skipCleanup); sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures); sb.append('}'); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index c40bb0b19c88b..a92719b8d2614 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1562,10 +1562,18 @@ mapreduce.fileoutputcommitter.algorithm.version - 2 - The file output committer algorithm version - valid algorithm version number: 1 or 2 - default to 2, which is the original algorithm + 1 + The file output committer algorithm version. + + There have been two algorithm versions in Hadoop, "1" and "2". + + The version 2 algorithm has been disabled as task commits + were not atomic. If the first task attempt failed partway + through the task commit, the output directory may end up + with data from that failed commit, as well as the data + from any subsequent attempts. + + See https://issues.apache.org/jira/browse/MAPREDUCE-7282 In algorithm version 1, @@ -1585,32 +1593,58 @@ $joboutput/, then it will delete $joboutput/_temporary/ and write $joboutput/_SUCCESS - It has a performance regression, which is discussed in MAPREDUCE-4815. + It has a performance limitation, which is discussed in MAPREDUCE-4815. + If a job generates many files to commit then the commitJob method call at the end of the job can take minutes. the commit is single-threaded and waits until all tasks have completed before commencing. - algorithm version 2 will change the behavior of commitTask, + The algorithm version 2 changed the behavior of commitTask, recoverTask, and commitJob. - 1. commitTask will rename all files in + 1. commitTask renamed all files in $joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/ to $joboutput/ - 2. recoverTask actually doesn't require to do anything, but for + 2. recoverTask didn't do anything, but for upgrade from version 1 to version 2 case, it will check if there are any files in $joboutput/_temporary/($appAttemptID - 1)/$taskID/ and rename them to $joboutput/ - 3. commitJob can simply delete $joboutput/_temporary and write + 3. commitJob deleted $joboutput/_temporary and wrotew $joboutput/_SUCCESS - This algorithm will reduce the output commit time for - large jobs by having the tasks commit directly to the final - output directory as they were completing and commitJob had - very little to do. + This algorithm reduced the output commit time for large jobs + as the work of renaming files to their destination took place + incrementally as tasks committed. However, it has a key flaw + + Task Attempt Commit is not atomic. If a task attempt ID 1 failed + partway through the rename, a second task attempt would be + scheduled. + + - If task attempts 1 and 2 generated files with different names, + then those files from task 1 with different names which were + already renamed into the destination, would still be present. + They would not be overwritten by the second attempt. + + - If task attempt 1 was still executing -even though the job + driver considered it to have failed- then a race condition + could arise where the output contained a mix of both task + attempts. + + The applications which use these committers, including MapReduce, + and Apache Spark expect task attempts to be atomic -this + commit algorithm is not compatible. + + Note: + * the Apache Hadoop S3A commit algorithms do have atomic task + commit and are safe. + * The Amazon "EMRFS S3-optimized Committer" completes its + multipart uploads in task commit, so has the same + limitation as for algorithm v2. + https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-committer-multipart.html diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java index bb5c30e9511fa..36917531b4021 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java @@ -161,16 +161,6 @@ public void testRecoveryV1() throws Exception { testRecoveryInternal(1, 1); } - @Test - public void testRecoveryV2() throws Exception { - testRecoveryInternal(2, 2); - } - - @Test - public void testRecoveryUpgradeV1V2() throws Exception { - testRecoveryInternal(1, 2); - } - private void validateContent(Path dir) throws IOException { File fdir = new File(dir.toUri().getPath()); File expectedFile = new File(fdir, partFile); @@ -215,12 +205,6 @@ public void testCommitterWithFailureV1() throws Exception { testCommitterWithFailureInternal(1, 2); } - @Test - public void testCommitterWithFailureV2() throws Exception { - testCommitterWithFailureInternal(2, 1); - testCommitterWithFailureInternal(2, 2); - } - private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception { JobConf conf = new JobConf(); FileOutputFormat.setOutputPath(conf, outDir); @@ -269,11 +253,6 @@ public void testCommitterWithDuplicatedCommitV1() throws Exception { testCommitterWithDuplicatedCommitInternal(1); } - @Test - public void testCommitterWithDuplicatedCommitV2() throws Exception { - testCommitterWithDuplicatedCommitInternal(2); - } - private void testCommitterWithDuplicatedCommitInternal(int version) throws Exception { JobConf conf = new JobConf(); @@ -355,11 +334,6 @@ public void testCommitterV1() throws Exception { testCommitterInternal(1); } - @Test - public void testCommitterV2() throws Exception { - testCommitterInternal(2); - } - private void testMapFileOutputCommitterInternal(int version) throws Exception { JobConf conf = new JobConf(); @@ -397,21 +371,11 @@ public void testMapFileOutputCommitterV1() throws Exception { testMapFileOutputCommitterInternal(1); } - @Test - public void testMapFileOutputCommitterV2() throws Exception { - testMapFileOutputCommitterInternal(2); - } - @Test public void testMapOnlyNoOutputV1() throws Exception { testMapOnlyNoOutputInternal(1); } - @Test - public void testMapOnlyNoOutputV2() throws Exception { - testMapOnlyNoOutputInternal(2); - } - private void testMapOnlyNoOutputInternal(int version) throws Exception { JobConf conf = new JobConf(); //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir); @@ -477,11 +441,6 @@ public void testAbortV1() throws Exception { testAbortInternal(1); } - @Test - public void testAbortV2() throws Exception { - testAbortInternal(2); - } - public static class FakeFileSystem extends RawLocalFileSystem { public FakeFileSystem() { super(); @@ -560,10 +519,6 @@ public void testFailAbortV1() throws Exception { testFailAbortInternal(1); } - @Test - public void testFailAbortV2() throws Exception { - testFailAbortInternal(2); - } public static String slurp(File f) throws IOException { int len = (int) f.length(); byte[] buf = new byte[len]; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index 526485df93490..134823b6be671 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -209,16 +209,6 @@ public void testRecoveryV1() throws Exception { testRecoveryInternal(1, 1); } - @Test - public void testRecoveryV2() throws Exception { - testRecoveryInternal(2, 2); - } - - @Test - public void testRecoveryUpgradeV1V2() throws Exception { - testRecoveryInternal(1, 2); - } - private void validateContent(Path dir) throws IOException { validateContent(new File(dir.toUri().getPath())); } @@ -270,9 +260,6 @@ private void testCommitterInternal(int version, boolean taskCleanup) conf.setInt( FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); - conf.setBoolean( - FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, - taskCleanup); JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); @@ -321,26 +308,11 @@ public void testCommitterV1() throws Exception { testCommitterInternal(1, false); } - @Test - public void testCommitterV2() throws Exception { - testCommitterInternal(2, false); - } - - @Test - public void testCommitterV2TaskCleanupEnabled() throws Exception { - testCommitterInternal(2, true); - } - @Test public void testCommitterWithDuplicatedCommitV1() throws Exception { testCommitterWithDuplicatedCommitInternal(1); } - @Test - public void testCommitterWithDuplicatedCommitV2() throws Exception { - testCommitterWithDuplicatedCommitInternal(2); - } - private void testCommitterWithDuplicatedCommitInternal(int version) throws Exception { Job job = Job.getInstance(); @@ -389,12 +361,6 @@ public void testCommitterWithFailureV1() throws Exception { testCommitterWithFailureInternal(1, 2); } - @Test - public void testCommitterWithFailureV2() throws Exception { - testCommitterWithFailureInternal(2, 1); - testCommitterWithFailureInternal(2, 2); - } - private void testCommitterWithFailureInternal(int version, int maxAttempts) throws Exception { Job job = Job.getInstance(); @@ -473,11 +439,6 @@ public void testCommitterRepeatableV1() throws Exception { testCommitterRetryInternal(1); } - @Test - public void testCommitterRepeatableV2() throws Exception { - testCommitterRetryInternal(2); - } - // retry committer for 2 times. private void testCommitterRetryInternal(int version) throws Exception { @@ -577,11 +538,6 @@ public void testMapFileOutputCommitterV1() throws Exception { testMapFileOutputCommitterInternal(1); } - @Test - public void testMapFileOutputCommitterV2() throws Exception { - testMapFileOutputCommitterInternal(2); - } - @Test public void testInvalidVersionNumber() throws IOException { Job job = Job.getInstance(); @@ -639,11 +595,6 @@ public void testAbortV1() throws IOException, InterruptedException { testAbortInternal(1); } - @Test - public void testAbortV2() throws IOException, InterruptedException { - testAbortInternal(2); - } - public static class FakeFileSystem extends RawLocalFileSystem { public FakeFileSystem() { super(); @@ -720,11 +671,6 @@ public void testFailAbortV1() throws Exception { testFailAbortInternal(1); } - @Test - public void testFailAbortV2() throws Exception { - testFailAbortInternal(2); - } - static class RLFS extends RawLocalFileSystem { private final ThreadLocal needNull = new ThreadLocal() { @Override @@ -823,11 +769,6 @@ public void testConcurrentCommitTaskWithSubDirV1() throws Exception { testConcurrentCommitTaskWithSubDir(1); } - @Test - public void testConcurrentCommitTaskWithSubDirV2() throws Exception { - testConcurrentCommitTaskWithSubDir(2); - } - public static String slurp(File f) throws IOException { int len = (int) f.length(); byte[] buf = new byte[len];