Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.OutputCommitter;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
Expand Down Expand Up @@ -85,6 +86,18 @@ protected static CodecFactory getCompressionCodec(TaskAttemptContext context) {
return CodecFactory.nullCodec();
}

private Path getWorkPathFromCommitter(TaskAttemptContext context) throws IOException {
// When Hadoop 2 support is dropped, this method removed to a simple cast
// See https://github.com/apache/avro/pull/1431/
OutputCommitter committer = getOutputCommitter(context);
try {
return (Path) committer.getClass().getMethod("getWorkPath").invoke(committer);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException(
"Committer: " + committer.getClass().getName() + " does not have method getWorkPath", e);
}
}

/**
* Gets the target output stream where the Avro container file should be
* written.
Expand All @@ -93,7 +106,7 @@ protected static CodecFactory getCompressionCodec(TaskAttemptContext context) {
* @return The target output stream.
*/
protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
Path path = new Path(((FileOutputCommitter) getOutputCommitter(context)).getWorkPath(),
Path path = new Path(getWorkPathFromCommitter(context),
getUniqueFile(context, context.getConfiguration().get("avro.mo.config.namedOutput", "part"),
org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
Expand Down