diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 5970373a9f31a..cf5df706beeff 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -24,13 +24,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.common.Abortable; /** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class FSDataOutputStream extends DataOutputStream - implements Syncable, CanSetDropBehind, StreamCapabilities { + implements Syncable, CanSetDropBehind, Abortable, StreamCapabilities { private final OutputStream wrappedStream; private static class PositionCache extends FilterOutputStream { @@ -101,6 +102,15 @@ public void close() throws IOException { out.close(); // This invokes PositionCache.close() } + @Override + public void abort() throws IOException { + if (wrappedStream instanceof Abortable) { + ((Abortable) wrappedStream).abort(); + } else { + wrappedStream.close(); + } + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/common/Abortable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/common/Abortable.java new file mode 100644 index 0000000000000..72880ca8bccf7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/common/Abortable.java @@ -0,0 +1,14 @@ +package org.apache.hadoop.fs.common; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; + +@Private +public interface Abortable { + /* + * Instruct the object to abort it's current transaction. This will also close + * the object. + */ + public void abort() throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java index 51db0b3f0afef..0f4976bdbb693 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.Abortable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -71,7 +72,7 @@ public class MapFile { protected MapFile() {} // no public ctor /** Writes a new map. */ - public static class Writer implements java.io.Closeable { + public static class Writer implements java.io.Closeable, Abortable { private SequenceFile.Writer data; private SequenceFile.Writer index; @@ -285,6 +286,15 @@ public Writer(Configuration conf, this.index = SequenceFile.createWriter(conf, indexOptions); } + @Override + public void abort() throws IOException { + try { + data.abort(); + } finally { + index.abort(); + } + } + /** The number of entries that are added before an index entry is added.*/ public int getIndexInterval() { return indexInterval; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index f42848b00cd34..6088fa1e60601 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -25,6 +25,7 @@ import java.security.MessageDigest; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.common.Abortable; import org.apache.hadoop.util.Options; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Options.CreateOpts; @@ -834,7 +835,7 @@ public String toString() { } /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable, Syncable { + public static class Writer implements java.io.Closeable, Syncable, Abortable { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; @@ -875,6 +876,15 @@ public static class Writer implements java.io.Closeable, Syncable { } } + @Override + public void abort() throws IOException { + if (out instanceof Abortable) { + ((Abortable) out).abort(); + } else { + out.close(); + } + } + public static interface Option {} static class FileOption extends Options.PathOption diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java index 044824356ed30..24601820bde64 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java @@ -37,6 +37,10 @@ import static org.junit.Assert.fail; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -487,6 +491,19 @@ private void sortMetadataTest(FileSystem fs, Path unsortedFile, Path sortedFile, sorter.sort(new Path[] { unsortedFile }, sortedFile, false); } + @Test + public void testAbort() throws IOException { + Configuration conf = new Configuration(); + LocalFileSystem fs = FileSystem.getLocal(conf); + Path path1 = new Path(System.getProperty("test.build.data",".")+"/test2.seq"); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path1, + Text.class, NullWritable.class, CompressionType.BLOCK); + writer.out = mock(FSDataOutputStream.class); + writer.append(new Text("file1-1"), NullWritable.get()); + writer.abort(); + verify(writer.out, times(1)).abort(); + } + @SuppressWarnings("deprecation") @Test public void testClose() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AbortableRecordWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AbortableRecordWriter.java new file mode 100644 index 0000000000000..0f6a95606a365 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AbortableRecordWriter.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.fs.common.Abortable; + +public abstract class AbortableRecordWriter implements RecordWriter, Abortable { +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java index bc746c5601013..931a85de8c03a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java @@ -69,7 +69,7 @@ public RecordWriter getRecordWriter(FileSystem ign compressionType, codec, progress); - return new RecordWriter() { + return new AbortableRecordWriter() { public void write(WritableComparable key, Writable value) throws IOException { @@ -78,7 +78,12 @@ public void write(WritableComparable key, Writable value) } public void close(Reporter reporter) throws IOException { out.close();} - }; + + @Override + public void abort() throws IOException { + out.abort(); + } + }; } /** Open the output generated by this format. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 17461b196b37e..45adb4f349bfd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.common.Abortable; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; @@ -461,6 +462,7 @@ void runOldMapper(final JobConf job, MapRunnable runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); + boolean successful = false; try { runner.run(in, new OldOutputCollector(collector, conf), reporter); mapPhase.complete(); @@ -469,6 +471,7 @@ void runOldMapper(final JobConf job, setPhase(TaskStatus.Phase.SORT); } statusUpdate(umbilical); + successful = true; collector.flush(); in.close(); @@ -477,6 +480,13 @@ void runOldMapper(final JobConf job, collector.close(); collector = null; } finally { + try { + if (!successful && collector instanceof Abortable) { + ((Abortable) collector).abort(); + } + } catch (Exception ex) { + LOG.error(ex.getMessage()); + } closeQuietly(in); closeQuietly(collector); } @@ -811,7 +821,7 @@ void runNewMapper(final JobConf job, } class DirectMapOutputCollector - implements MapOutputCollector { + implements MapOutputCollector, Abortable { private RecordWriter out = null; @@ -881,6 +891,15 @@ private long getOutputBytes(List stats) { } return bytesWritten; } + + @Override + public void abort() throws IOException { + if (out instanceof Abortable) { + ((Abortable) out).abort(); + } else { + out.close(reporter); + } + } } @InterfaceAudience.LimitedPrivate({"MapReduce"}) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java index b3c5de3c36745..29c300cba4028 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.Abortable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; @@ -456,6 +457,13 @@ public void collect(OUTKEY key, OUTVALUE value) out.close(reporter); out = null; + } catch (IOException ioe) { + if (out instanceof Abortable) { + try { + ((Abortable) out).abort(); + } catch (IOException ignored){} + } + throw ioe; } finally { IOUtils.cleanupWithLogger(LOG, reducer); closeQuietly(out, reporter); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java index 6b3a671c6d472..38a0166513c79 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java @@ -135,7 +135,7 @@ static public Class getSequenceFileOutputValueClass(JobConf codec, progress); - return new RecordWriter() { + return new AbortableRecordWriter() { private WritableValueBytes wvaluebytes = new WritableValueBytes(); @@ -151,7 +151,11 @@ public void close(Reporter reporter) throws IOException { out.close(); } - }; + @Override + public void abort() throws IOException { + out.abort(); + } + }; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java index d1040a6229cc0..f53002969dede 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java @@ -68,7 +68,7 @@ public RecordWriter getRecordWriter( codec, progress); - return new RecordWriter() { + return new AbortableRecordWriter() { public void write(K key, V value) throws IOException { @@ -77,7 +77,12 @@ public void write(K key, V value) } public void close(Reporter reporter) throws IOException { out.close();} - }; + + @Override + public void abort() throws IOException { + out.abort(); + } + }; } /** Open the output generated by this format. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TextOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TextOutputFormat.java index bf2f44733d2a3..e34c1091dc2be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TextOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TextOutputFormat.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.common.Abortable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; @@ -42,7 +43,7 @@ public class TextOutputFormat extends FileOutputFormat { protected static class LineRecordWriter - implements RecordWriter { + implements RecordWriter, Abortable { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); @@ -97,6 +98,15 @@ public synchronized void write(K key, V value) public synchronized void close(Reporter reporter) throws IOException { out.close(); } + + @Override + public void abort() throws IOException { + if (out instanceof Abortable) { + ((Abortable) out).abort(); + } else { + out.close(); + } + } } public RecordWriter getRecordWriter(FileSystem ignored, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java index 90ce57aa36cc8..b89f2bb63d785 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java @@ -26,6 +26,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.Abortable; +import org.apache.hadoop.mapred.AbortableRecordWriter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.RecordWriter; @@ -79,7 +81,7 @@ public RecordWriter getRecordWriter(FileSystem fs, JobConf job, final JobConf myJob = job; final Progressable myProgressable = arg3; - return new RecordWriter() { + return new AbortableRecordWriter() { // a cache storing the record writers for different output files. TreeMap> recordWriters = new TreeMap>(); @@ -115,6 +117,17 @@ public void close(Reporter reporter) throws IOException { } this.recordWriters.clear(); }; + + @Override + public void abort() throws IOException { + Iterator keys = this.recordWriters.keySet().iterator(); + while (keys.hasNext()) { + RecordWriter rw = this.recordWriters.get(keys.next()); + if (rw instanceof Abortable) { + ((Abortable) rw).abort(); + } + } + } }; }