Skip to content

Commit

Permalink
adding a useAsynchronousIO flag to SamReaderFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
akiezun committed May 11, 2016
1 parent f54ef22 commit 5e87478
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 11 deletions.
21 changes: 17 additions & 4 deletions src/java/htsjdk/samtools/BAMFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class BAMFileReader extends SamReader.ReaderImplementation {
// If true, all SAMRecords are fully decoded as they are read.
private boolean eagerDecode;

// If true, the BAMFileReader will use asynchronous IO.
// Note: this field currently has no effect (is not hooked up anywhere), but will be in the future. See https://github.com/samtools/htsjdk/pull/576
private final boolean useAsynchronousIO;

// For error-checking.
private ValidationStringency mValidationStringency;

Expand Down Expand Up @@ -97,11 +101,13 @@ class BAMFileReader extends SamReader.ReaderImplementation {
BAMFileReader(final InputStream stream,
final File indexFile,
final boolean eagerDecode,
final boolean useAsynchronousIO,
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
mIndexFile = indexFile;
mIsSeekable = false;
this.useAsynchronousIO = useAsynchronousIO;
mCompressedInputStream = new BlockCompressedInputStream(stream);
mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream));
this.eagerDecode = eagerDecode;
Expand All @@ -119,10 +125,11 @@ class BAMFileReader extends SamReader.ReaderImplementation {
BAMFileReader(final File file,
final File indexFile,
final boolean eagerDecode,
final boolean useAsynchronousIO,
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
this(new BlockCompressedInputStream(file), indexFile!=null ? indexFile : SamFiles.findIndex(file), eagerDecode, file.getAbsolutePath(), validationStringency, factory);
this(new BlockCompressedInputStream(file), indexFile!=null ? indexFile : SamFiles.findIndex(file), eagerDecode, useAsynchronousIO, file.getAbsolutePath(), validationStringency, factory);
if (mIndexFile != null && mIndexFile.lastModified() < file.lastModified()) {
System.err.println("WARNING: BAM index file " + mIndexFile.getAbsolutePath() +
" is older than BAM " + file.getAbsolutePath());
Expand All @@ -134,24 +141,27 @@ class BAMFileReader extends SamReader.ReaderImplementation {
BAMFileReader(final SeekableStream strm,
final File indexFile,
final boolean eagerDecode,
final boolean useAsynchronousIO,
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
this(new BlockCompressedInputStream(strm), indexFile, eagerDecode, strm.getSource(), validationStringency, factory);
this(new BlockCompressedInputStream(strm), indexFile, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory);
}

BAMFileReader(final SeekableStream strm,
final SeekableStream indexStream,
final boolean eagerDecode,
final boolean useAsynchronousIO,
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
throws IOException {
this(new BlockCompressedInputStream(strm), indexStream, eagerDecode, strm.getSource(), validationStringency, factory);
this(new BlockCompressedInputStream(strm), indexStream, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory);
}

private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
final File indexFile,
final boolean eagerDecode,
final boolean useAsynchronousIO,
final String source,
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
Expand All @@ -161,6 +171,7 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
mCompressedInputStream = compressedInputStream;
mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream));
this.eagerDecode = eagerDecode;
this.useAsynchronousIO = useAsynchronousIO;
this.mValidationStringency = validationStringency;
this.samRecordFactory = factory;
this.mFileHeader = readHeader(this.mStream, this.mValidationStringency, source);
Expand All @@ -170,6 +181,7 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
final SeekableStream indexStream,
final boolean eagerDecode,
final boolean useAsynchronousIO,
final String source,
final ValidationStringency validationStringency,
final SAMRecordFactory factory)
Expand All @@ -179,6 +191,7 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream,
mCompressedInputStream = compressedInputStream;
mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream));
this.eagerDecode = eagerDecode;
this.useAsynchronousIO = useAsynchronousIO;
this.mValidationStringency = validationStringency;
this.samRecordFactory = factory;
this.mFileHeader = readHeader(this.mStream, this.mValidationStringency, source);
Expand All @@ -187,7 +200,7 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream,

/** Reads through the header and sequence records to find the virtual file offset of the first record in the BAM file. */
static long findVirtualOffsetOfFirstRecord(final File bam) throws IOException {
final BAMFileReader reader = new BAMFileReader(bam, null, false, ValidationStringency.SILENT, new DefaultSAMRecordFactory());
final BAMFileReader reader = new BAMFileReader(bam, null, false, false, ValidationStringency.SILENT, new DefaultSAMRecordFactory());
final long offset = reader.mFirstRecordPointer;
reader.close();
return offset;
Expand Down
2 changes: 1 addition & 1 deletion src/java/htsjdk/samtools/BAMIndexMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ long getLastOffset() {
*/
static public void printIndexStats(final File inputBamFile) {
try {
final BAMFileReader bam = new BAMFileReader(inputBamFile, null, false, ValidationStringency.SILENT, new DefaultSAMRecordFactory());
final BAMFileReader bam = new BAMFileReader(inputBamFile, null, false, false, ValidationStringency.SILENT, new DefaultSAMRecordFactory());
if (!bam.hasIndex()) {
throw new SAMException("No index for bam file " + inputBamFile);
}
Expand Down
16 changes: 12 additions & 4 deletions src/java/htsjdk/samtools/SAMFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public static SAMSequenceDictionary getSequenceDictionary(final File dictionaryF
private BAMIndex mIndex = null;
private SAMRecordFactory samRecordFactory = new DefaultSAMRecordFactory();
private ReaderImplementation mReader = null;
private boolean useAsyncIO = Defaults.USE_ASYNC_IO_FOR_SAMTOOLS;

private File samFile = null;

Expand Down Expand Up @@ -202,6 +203,13 @@ public void close() {
mIndex = null;
}

/**
* If true, this reader will use asynchronous IO.
*/
public void setUseAsyncIO(final boolean useAsyncIO) {
this.useAsyncIO = useAsyncIO;
}

/**
* If true, writes the source of every read into the source SAMRecords.
*
Expand Down Expand Up @@ -593,7 +601,7 @@ private void init(final SeekableStream strm, final File indexFile, final boolean
try {
if (streamLooksLikeBam(strm)) {
mIsBinary = true;
mReader = new BAMFileReader(strm, indexFile, eagerDecode, validationStringency, this.samRecordFactory);
mReader = new BAMFileReader(strm, indexFile, eagerDecode, useAsyncIO, validationStringency, this.samRecordFactory);
} else {
throw new SAMFormatException("Unrecognized file format: " + strm);
}
Expand All @@ -609,7 +617,7 @@ private void init(final SeekableStream strm, final SeekableStream indexStream, f
try {
if (streamLooksLikeBam(strm)) {
mIsBinary = true;
mReader = new BAMFileReader(strm, indexStream, eagerDecode, validationStringency, this.samRecordFactory);
mReader = new BAMFileReader(strm, indexStream, eagerDecode, useAsyncIO, validationStringency, this.samRecordFactory);
} else {
throw new SAMFormatException("Unrecognized file format: " + strm);
}
Expand Down Expand Up @@ -645,10 +653,10 @@ private void init(final InputStream stream, File file, final File indexFile, fin
mIsBinary = true;
if (file == null || !file.isFile()) {
// Handle case in which file is a named pipe, e.g. /dev/stdin or created by mkfifo
mReader = new BAMFileReader(bufferedStream, indexFile, eagerDecode, validationStringency, this.samRecordFactory);
mReader = new BAMFileReader(bufferedStream, indexFile, eagerDecode, useAsyncIO, validationStringency, this.samRecordFactory);
} else {
bufferedStream.close();
mReader = new BAMFileReader(file, indexFile, eagerDecode, validationStringency, this.samRecordFactory);
mReader = new BAMFileReader(file, indexFile, eagerDecode, useAsyncIO, validationStringency, this.samRecordFactory);
}
} else if (BlockCompressedInputStream.isValidFile(bufferedStream)) {
mIsBinary = false;
Expand Down
18 changes: 16 additions & 2 deletions src/java/htsjdk/samtools/SamReaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public abstract class SamReaderFactory {
/** Set this factory's {@link ValidationStringency} to the provided one, then returns itself. */
abstract public SamReaderFactory validationStringency(final ValidationStringency validationStringency);

/** Set whether readers created by this factory will use asynchronous IO.
* If this methods is not called, this flag will default to the value of {@link Defaults#USE_ASYNC_IO_FOR_SAMTOOLS}.
* Note that this option may not be applicable to all readers returned from this factory.
* Returns the factory itself. */
abstract public SamReaderFactory setUseAsyncIo(final boolean asynchronousIO);

private static SamReaderFactoryImpl DEFAULT =
new SamReaderFactoryImpl(Option.DEFAULTS, defaultValidationStringency, DefaultSAMRecordFactory.getInstance());

Expand All @@ -115,6 +121,7 @@ private static class SamReaderFactoryImpl extends SamReaderFactory {
private final static Log LOG = Log.getInstance(SamReaderFactory.class);
private final EnumSet<Option> enabledOptions;
private ValidationStringency validationStringency;
private boolean asynchronousIO = Defaults.USE_ASYNC_IO_FOR_SAMTOOLS;
private SAMRecordFactory samRecordFactory;
private CustomReaderFactory customReaderFactory;
private ReferenceSource referenceSource;
Expand Down Expand Up @@ -207,6 +214,12 @@ public SamReaderFactory validationStringency(final ValidationStringency validati
return this;
}

@Override
public SamReaderFactory setUseAsyncIo(final boolean asynchronousIO){
this.asynchronousIO = asynchronousIO;
return this;
}

@Override
public SamReader open(final SamInputResource resource) {
final SamReader.PrimitiveSamReader primitiveSamReader;
Expand Down Expand Up @@ -236,6 +249,7 @@ public SamReader open(final SamInputResource resource) {
IOUtil.maybeBufferedSeekableStream(data.asUnbufferedSeekableStream()),
bufferedIndexStream,
false,
asynchronousIO,
validationStringency,
this.samRecordFactory
);
Expand All @@ -255,10 +269,10 @@ public SamReader open(final SamInputResource resource) {
if (SamStreams.isBAMFile(bufferedStream)) {
if (sourceFile == null || !sourceFile.isFile()) {
// Handle case in which file is a named pipe, e.g. /dev/stdin or created by mkfifo
primitiveSamReader = new BAMFileReader(bufferedStream, indexFile, false, validationStringency, this.samRecordFactory);
primitiveSamReader = new BAMFileReader(bufferedStream, indexFile, false, asynchronousIO, validationStringency, this.samRecordFactory);
} else {
bufferedStream.close();
primitiveSamReader = new BAMFileReader(sourceFile, indexFile, false, validationStringency, this.samRecordFactory);
primitiveSamReader = new BAMFileReader(sourceFile, indexFile, false, asynchronousIO, validationStringency, this.samRecordFactory);
}
} else if (BlockCompressedInputStream.isValidFile(bufferedStream)) {
primitiveSamReader = new SAMTextReader(new BlockCompressedInputStream(bufferedStream), validationStringency, this.samRecordFactory);
Expand Down

0 comments on commit 5e87478

Please sign in to comment.