Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class FileOutputCommitter extends PathOutputCommitter {
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
"mapreduce.fileoutputcommitter.algorithm.version";
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2;
public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
// Skip cleanup _temporary folders under job's output directory
public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED =
"mapreduce.fileoutputcommitter.cleanup.skipped";
Expand Down Expand Up @@ -348,6 +348,19 @@ public Path getWorkPath() throws IOException {
* @param context the job's context
*/
public void setupJob(JobContext context) throws IOException {
// warning about v2 which is made to a custom logger so people can
// turn it off if they are happy with the v2 commit protocol.
if (algorithmVersion == 2) {
Logger log = LoggerFactory.getLogger(
"org.apache.hadoop.mapreduce.lib.output."
+ "FileOutputCommitter.Algorithm");

log.warn("The v2 commit algorithm assumes that the content of generated output files is"
+ " consistent across all task attempts"
+ " -if this is not true for this job, switch to the v1 commit algorithm."
+ " See MAPREDUCE-7282");
}

if (hasOutputPath()) {
Path jobAttemptPath = getJobAttemptPath(context);
FileSystem fs = jobAttemptPath.getFileSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,10 +1562,10 @@

<property>
<name>mapreduce.fileoutputcommitter.algorithm.version</name>
<value>2</value>
<description>The file output committer algorithm version
valid algorithm version number: 1 or 2
default to 2, which is the original algorithm
<value>1</value>
<description>The file output committer algorithm version.

There are two algorithm versions in Hadoop, "1" and "2".

In algorithm version 1,

Expand Down Expand Up @@ -1611,6 +1611,35 @@
large jobs by having the tasks commit directly to the final
output directory as they were completing and commitJob had
very little to do.

The v2 commit algorithm assumes that the content of generated output files
is consistent across all task attempts - if this is not true for a job,
the v1 commit algorithm should be used.

This is because task commits are not atomic

If a first task attempt fails part-way
through its task commit, the output directory can end up
with data from that failed commit, alongside the data
from any subsequent attempts.

See https://issues.apache.org/jira/browse/MAPREDUCE-7282

Although no-longer the default, this algorithm is safe to use if
all task attempts for a single task meet the following requirements
-they generate exactly the same set of files
-the contents of each file are exactly the same in each task attempt

That is:
1. If a second attempt commits work, there will be no leftover files from
a first attempt which failed during its task commit.
2. If a network partition causes the first task attempt to overwrite
some/all of the output of a second attempt, the result will be
exactly the same as if it had not done so.

To avoid the warning message on job setup, set the log level of the log
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.Algorithm
to ERROR.
</description>
</property>

Expand Down