Skip to content

Commit 9040a2f

Browse files
author
vnarayanan
committed
TEZ-4547: Add Tez AM JobID to the JobConf
Some committers require a job-wide UUID to function correctly. Adding the AM JobID to the JobConf will allow applications to pass that to the committers that need it.
1 parent 34bb628 commit 9040a2f

File tree

4 files changed

+27
-0
lines changed

4 files changed

+27
-0
lines changed

tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public void abortOutput(VertexStatus.State finalState) throws IOException {
119119
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
120120
newApiCommitter = true;
121121
}
122+
jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new org.apache.hadoop.mapred.JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString());
122123
LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
123124
" using " + (newApiCommitter ? "new" : "old") + "mapred API");
124125

tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public interface MRJobConfig {
131131

132132
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
133133

134+
public static final String MR_PARENT_JOB_ID = "mapreduce.parent.job.id";
135+
134136
public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version";
135137

136138
/**

tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.mapred.FileOutputFormat;
4545
import org.apache.hadoop.mapred.JobConf;
4646
import org.apache.hadoop.mapred.JobContext;
47+
import org.apache.hadoop.mapred.JobID;
4748
import org.apache.hadoop.mapred.TaskAttemptID;
4849
import org.apache.hadoop.mapreduce.OutputCommitter;
4950
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -417,6 +418,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException
417418
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
418419
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
419420
getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput);
421+
jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString());
420422
jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
421423
jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
422424
jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);

tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.io.NullWritable;
3636
import org.apache.hadoop.io.Text;
3737
import org.apache.hadoop.mapred.JobConf;
38+
import org.apache.hadoop.mapred.JobID;
3839
import org.apache.hadoop.mapred.Reporter;
3940
import org.apache.hadoop.mapreduce.JobContext;
4041
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -57,6 +58,7 @@
5758
import org.apache.tez.mapreduce.TestUmbilical;
5859
import org.apache.tez.mapreduce.TezTestUtils;
5960
import org.apache.tez.mapreduce.hadoop.MRConfig;
61+
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
6062
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
6163
import org.apache.tez.runtime.api.OutputContext;
6264
import org.apache.tez.runtime.api.ProcessorContext;
@@ -131,6 +133,26 @@ public void testMergeConfig() throws Exception {
131133
assertEquals("base-value", mergedConf.get("base-key"));
132134
}
133135

136+
@Test
137+
public void testParentJobIDSet() throws Exception {
138+
Configuration conf = new Configuration();
139+
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
140+
DataSinkDescriptor dataSink = MROutput
141+
.createConfigBuilder(conf, TextOutputFormat.class,
142+
tmpDir.getPath())
143+
.build();
144+
145+
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
146+
new Configuration(false));
147+
MROutput output = new MROutput(outputContext, 2);
148+
output.initialize();
149+
String invalidJobID = "invalid default";
150+
String parentJobID = output.jobConf.get(MRJobConfig.MR_PARENT_JOB_ID, invalidJobID);
151+
assertNotEquals(parentJobID,invalidJobID);
152+
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID),parentJobID);
153+
assertEquals(parentJobID, new JobID(String.valueOf(outputContext.getApplicationId().getClusterTimestamp()),outputContext.getApplicationId().getId()).toString());
154+
}
155+
134156
@Test(timeout = 5000)
135157
public void testOldAPI_TextOutputFormat() throws Exception {
136158
Configuration conf = new Configuration();

0 commit comments

Comments
 (0)