Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.common.Abortable;

/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities {
implements Syncable, CanSetDropBehind, Abortable, StreamCapabilities {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -101,6 +102,15 @@ public void close() throws IOException {
out.close(); // This invokes PositionCache.close()
}

@Override
public void abort() throws IOException {
if (wrappedStream instanceof Abortable) {
((Abortable) wrappedStream).abort();
} else {
wrappedStream.close();
}
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.hadoop.fs.common;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience.Private;

@Private
public interface Abortable {
/*
* Instruct the object to abort it's current transaction. This will also close
* the object.
*/
public void abort() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class MapFile {
protected MapFile() {} // no public ctor

/** Writes a new map. */
public static class Writer implements java.io.Closeable {
public static class Writer implements java.io.Closeable, Abortable {
private SequenceFile.Writer data;
private SequenceFile.Writer index;

Expand Down Expand Up @@ -285,6 +286,15 @@ public Writer(Configuration conf,
this.index = SequenceFile.createWriter(conf, indexOptions);
}

@Override
public void abort() throws IOException {
try {
data.abort();
} finally {
index.abort();
}
}

/** The number of entries that are added before an index entry is added.*/
public int getIndexInterval() { return indexInterval; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.security.MessageDigest;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Options.CreateOpts;
Expand Down Expand Up @@ -834,7 +835,7 @@ public String toString() {
}

/** Write key/value pairs to a sequence-format file. */
public static class Writer implements java.io.Closeable, Syncable {
public static class Writer implements java.io.Closeable, Syncable, Abortable {
private Configuration conf;
FSDataOutputStream out;
boolean ownOutputStream = true;
Expand Down Expand Up @@ -875,6 +876,15 @@ public static class Writer implements java.io.Closeable, Syncable {
}
}

@Override
public void abort() throws IOException {
if (out instanceof Abortable) {
((Abortable) out).abort();
} else {
out.close();
}
}

public static interface Option {}

static class FileOption extends Options.PathOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -487,6 +491,19 @@ private void sortMetadataTest(FileSystem fs, Path unsortedFile, Path sortedFile,
sorter.sort(new Path[] { unsortedFile }, sortedFile, false);
}

@Test
public void testAbort() throws IOException {
Configuration conf = new Configuration();
LocalFileSystem fs = FileSystem.getLocal(conf);
Path path1 = new Path(System.getProperty("test.build.data",".")+"/test2.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path1,
Text.class, NullWritable.class, CompressionType.BLOCK);
writer.out = mock(FSDataOutputStream.class);
writer.append(new Text("file1-1"), NullWritable.get());
writer.abort();
verify(writer.out, times(1)).abort();
}

@SuppressWarnings("deprecation")
@Test
public void testClose() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.hadoop.mapred;

import org.apache.hadoop.fs.common.Abortable;

public abstract class AbortableRecordWriter<K, V> implements RecordWriter<K, V>, Abortable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ign
compressionType, codec,
progress);

return new RecordWriter<WritableComparable, Writable>() {
return new AbortableRecordWriter<WritableComparable, Writable>() {

public void write(WritableComparable key, Writable value)
throws IOException {
Expand All @@ -78,7 +78,12 @@ public void write(WritableComparable key, Writable value)
}

public void close(Reporter reporter) throws IOException { out.close();}
};

@Override
public void abort() throws IOException {
out.abort();
}
};
}

/** Open the output generated by this format. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
Expand Down Expand Up @@ -461,6 +462,7 @@ void runOldMapper(final JobConf job,
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

boolean successful = false;
try {
runner.run(in, new OldOutputCollector(collector, conf), reporter);
mapPhase.complete();
Expand All @@ -469,6 +471,7 @@ void runOldMapper(final JobConf job,
setPhase(TaskStatus.Phase.SORT);
}
statusUpdate(umbilical);
successful = true;
collector.flush();

in.close();
Expand All @@ -477,6 +480,13 @@ void runOldMapper(final JobConf job,
collector.close();
collector = null;
} finally {
try {
if (!successful && collector instanceof Abortable) {
((Abortable) collector).abort();
}
} catch (Exception ex) {
LOG.error(ex.getMessage());
}
closeQuietly(in);
closeQuietly(collector);
}
Expand Down Expand Up @@ -811,7 +821,7 @@ void runNewMapper(final JobConf job,
}

class DirectMapOutputCollector<K, V>
implements MapOutputCollector<K, V> {
implements MapOutputCollector<K, V>, Abortable {

private RecordWriter<K, V> out = null;

Expand Down Expand Up @@ -881,6 +891,15 @@ private long getOutputBytes(List<Statistics> stats) {
}
return bytesWritten;
}

@Override
public void abort() throws IOException {
if (out instanceof Abortable) {
((Abortable) out).abort();
} else {
out.close(reporter);
}
}
}

@InterfaceAudience.LimitedPrivate({"MapReduce"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
Expand Down Expand Up @@ -456,6 +457,13 @@ public void collect(OUTKEY key, OUTVALUE value)

out.close(reporter);
out = null;
} catch (IOException ioe) {
if (out instanceof Abortable) {
try {
((Abortable) out).abort();
} catch (IOException ignored){}
}
throw ioe;
} finally {
IOUtils.cleanupWithLogger(LOG, reducer);
closeQuietly(out, reporter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf
codec,
progress);

return new RecordWriter<BytesWritable, BytesWritable>() {
return new AbortableRecordWriter<BytesWritable, BytesWritable>() {

private WritableValueBytes wvaluebytes = new WritableValueBytes();

Expand All @@ -151,7 +151,11 @@ public void close(Reporter reporter) throws IOException {
out.close();
}

};
@Override
public void abort() throws IOException {
out.abort();
}
};

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public RecordWriter<K, V> getRecordWriter(
codec,
progress);

return new RecordWriter<K, V>() {
return new AbortableRecordWriter<K, V>() {

public void write(K key, V value)
throws IOException {
Expand All @@ -77,7 +77,12 @@ public void write(K key, V value)
}

public void close(Reporter reporter) throws IOException { out.close();}
};

@Override
public void abort() throws IOException {
out.abort();
}
};
}

/** Open the output generated by this format. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand All @@ -42,7 +43,7 @@
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {

protected static class LineRecordWriter<K, V>
implements RecordWriter<K, V> {
implements RecordWriter<K, V>, Abortable {
private static final byte[] NEWLINE =
"\n".getBytes(StandardCharsets.UTF_8);

Expand Down Expand Up @@ -97,6 +98,15 @@ public synchronized void write(K key, V value)
public synchronized void close(Reporter reporter) throws IOException {
out.close();
}

@Override
public void abort() throws IOException {
if (out instanceof Abortable) {
((Abortable) out).abort();
} else {
out.close();
}
}
}

public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.mapred.AbortableRecordWriter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
Expand Down Expand Up @@ -79,7 +81,7 @@ public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
final JobConf myJob = job;
final Progressable myProgressable = arg3;

return new RecordWriter<K, V>() {
return new AbortableRecordWriter<K, V>() {

// a cache storing the record writers for different output files.
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
Expand Down Expand Up @@ -115,6 +117,17 @@ public void close(Reporter reporter) throws IOException {
}
this.recordWriters.clear();
};

@Override
public void abort() throws IOException {
Iterator<String> keys = this.recordWriters.keySet().iterator();
while (keys.hasNext()) {
RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
if (rw instanceof Abortable) {
((Abortable) rw).abort();
}
}
}
};
}

Expand Down