Skip to content

Commit

Permalink
Load BlockCompressedIndexedFastaSequenceFile and GZIIndex from streams (
Browse files Browse the repository at this point in the history
#1259)

* Allow BlockCompressedIndexedFastaSequenceFile and GZIIndex to be created from streams.
This enables fasta.gz files to be loaded from streams.
  • Loading branch information
tomwhite authored and lbergelson committed Feb 26, 2019
1 parent 205d5f0 commit 2d2922f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
package htsjdk.samtools.reference;

import htsjdk.samtools.SAMException;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.seekablestream.ReadableSeekableStreamByteChannel;
import htsjdk.samtools.seekablestream.SeekablePathStream;
import htsjdk.samtools.seekablestream.SeekableStream;
import htsjdk.samtools.util.BlockCompressedInputStream;
import htsjdk.samtools.util.GZIIndex;
import htsjdk.samtools.util.IOUtil;
Expand Down Expand Up @@ -75,6 +78,23 @@ public BlockCompressedIndexedFastaSequenceFile(final Path path, final FastaSeque
}
}

/**
* Initialize the given indexed fasta sequence file stream.
* @param source The named source of the reference file (used in error messages).
* @param in The input stream to read the fasta file from; should not be decompressed already.
* @param index The fasta index.
* @param dictionary The sequence dictionary, or null if there isn't one.
* @param gziIndex The GZI index; may not be null.
*/
public BlockCompressedIndexedFastaSequenceFile(final String source, final SeekableStream in, final FastaSequenceIndex index, final SAMSequenceDictionary dictionary, final GZIIndex gziIndex) {
super(source, index, dictionary);
if (gziIndex == null) {
throw new IllegalArgumentException("null gzi index");
}
stream = new BlockCompressedInputStream(in);
gzindex = gziIndex;
}

private static GZIIndex loadFastaGziIndex(final Path path) {
try {
return GZIIndex.loadIndex(GZIIndex.resolveIndexNameForBgzipFile(path));
Expand Down
124 changes: 80 additions & 44 deletions src/main/java/htsjdk/samtools/util/GZIIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
package htsjdk.samtools.util;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ByteChannel;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -258,66 +261,99 @@ public static final GZIIndex loadIndex(final Path indexPath) throws IOException
if (indexPath == null) {
throw new IllegalArgumentException("null input path");
}
try (final ReadableByteChannel channel = Files.newByteChannel(indexPath)) {
return loadIndex(indexPath.toUri().toString(), channel);
}
}

/**
* Loads the index from the provided input stream.
*
* @param source The named source of the reference file (used in error messages). May be null if unknown.
* @param indexIn the input stream for the index to load.
*
* @return loaded index.
*
* @throws IOException if an I/O error occurs.
*/
public static final GZIIndex loadIndex(final String source, final InputStream indexIn) throws IOException {
if (indexIn == null) {
throw new IllegalArgumentException("null input stream");
}
try (final ReadableByteChannel channel = Channels.newChannel(indexIn)) {
return loadIndex(source, channel);
}
}

/**
* Loads the index from the provided channel.
*
* @param source The named source of the reference file (used in error messages). May be null if unknown.
* @param channel the channel to read the index from.
*
* @return loaded index.
*
* @throws IOException if an I/O error occurs.
*/
public static final GZIIndex loadIndex(final String source, final ReadableByteChannel channel) throws IOException {
// allocate a buffer for re-use for read each byte
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);

try (final ByteChannel channel = Files.newByteChannel(indexPath)) {
if (Long.BYTES != channel.read(buffer)) {
throw getCorruptedIndexException(indexPath, "less than " + Long.BYTES+ "bytes", null);
}
buffer.flip();
if (Long.BYTES != channel.read(buffer)) {
throw getCorruptedIndexException(source, "less than " + Long.BYTES+ "bytes", null);
}
buffer.flip();

final int numberOfEntries;
try {
numberOfEntries = Math.toIntExact(buffer.getLong());
} catch (ArithmeticException e) {
buffer.flip();
throw getCorruptedIndexException(indexPath,
String.format("HTSJDK cannot handle more than %d entries in .gzi index, but found %s",
Integer.MAX_VALUE, buffer.getLong()),
e);
}
final int numberOfEntries;
try {
numberOfEntries = Math.toIntExact(buffer.getLong());
} catch (ArithmeticException e) {
buffer.flip();
throw getCorruptedIndexException(source,
String.format("HTSJDK cannot handle more than %d entries in .gzi index, but found %s",
Integer.MAX_VALUE, buffer.getLong()),
e);
}

// allocate array with the entries and add the first one
final List<IndexEntry> entries = new ArrayList<>(numberOfEntries);
// allocate array with the entries and add the first one
final List<IndexEntry> entries = new ArrayList<>(numberOfEntries);

// create a new buffer with the correct size and read into it
buffer = allocateBuffer(numberOfEntries, false);
channel.read(buffer);
buffer.flip();
// create a new buffer with the correct size and read into it
buffer = allocateBuffer(numberOfEntries, false);
channel.read(buffer);
buffer.flip();

for (int i = 0; i < numberOfEntries; i++) {
final IndexEntry entry;
try {
entry = new IndexEntry(buffer.getLong(), buffer.getLong());
} catch (IllegalArgumentException e) {
throw getCorruptedIndexException(indexPath, e.getMessage(), e);
}
// check if the entry is increasing in order
if (i == 0) {
if (entry.getUncompressedOffset() == 0 && entry.getCompressedOffset() == 0) {
throw getCorruptedIndexException(indexPath, "first block index entry should not be present", null);
}
} else if (entries.get(i - 1).getCompressedOffset() >= entry.getCompressedOffset()
|| entries.get(i - 1).getUncompressedOffset() >= entry.getUncompressedOffset()) {
throw getCorruptedIndexException(indexPath,
String.format("index entries in misplaced order - %s vs %s",
entries.get(i - 1), entry),
null);
for (int i = 0; i < numberOfEntries; i++) {
final IndexEntry entry;
try {
entry = new IndexEntry(buffer.getLong(), buffer.getLong());
} catch (IllegalArgumentException e) {
throw getCorruptedIndexException(source, e.getMessage(), e);
}
// check if the entry is increasing in order
if (i == 0) {
if (entry.getUncompressedOffset() == 0 && entry.getCompressedOffset() == 0) {
throw getCorruptedIndexException(source, "first block index entry should not be present", null);
}

entries.add(entry);
} else if (entries.get(i - 1).getCompressedOffset() >= entry.getCompressedOffset()
|| entries.get(i - 1).getUncompressedOffset() >= entry.getUncompressedOffset()) {
throw getCorruptedIndexException(source,
String.format("index entries in misplaced order - %s vs %s",
entries.get(i - 1), entry),
null);
}

return new GZIIndex(entries);
entries.add(entry);
}

return new GZIIndex(entries);
}

private static final IOException getCorruptedIndexException(final Path indexPath, final String msg, final Exception e) {
private static final IOException getCorruptedIndexException(final String source, final String msg, final Exception e) {
return new IOException(String.format("Corrupted index file: %s (%s)",
msg,
indexPath == null ? "unknown" : indexPath.toUri()),
source == null ? "unknown" : source),
e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import htsjdk.samtools.SAMException;
import htsjdk.samtools.seekablestream.SeekableFileStream;
import htsjdk.samtools.util.CloserUtil;
import htsjdk.samtools.util.GZIIndex;
import htsjdk.samtools.util.RuntimeIOException;
import htsjdk.samtools.util.StringUtil;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand All @@ -36,6 +38,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;

/**
* Test the indexed fasta sequence file reader.
Expand All @@ -45,6 +48,7 @@ public class AbstractIndexedFastaSequenceFileTest extends HtsjdkTest {
private static final File SEQUENCE_FILE = new File(TEST_DATA_DIR,"Homo_sapiens_assembly18.trimmed.fasta");
private static final File SEQUENCE_FILE_INDEX = new File(TEST_DATA_DIR,"Homo_sapiens_assembly18.trimmed.fasta.fai");
private static final File SEQUENCE_FILE_BGZ = new File(TEST_DATA_DIR,"Homo_sapiens_assembly18.trimmed.fasta.gz");
private static final File SEQUENCE_FILE_GZI = new File(TEST_DATA_DIR,"Homo_sapiens_assembly18.trimmed.fasta.gz.gzi");
private static final File SEQUENCE_FILE_NODICT = new File(TEST_DATA_DIR,"Homo_sapiens_assembly18.trimmed.nodict.fasta");

private final String firstBasesOfChrM = "GATCACAGGTCTATCACCCT";
Expand All @@ -66,6 +70,12 @@ public Object[][] provideSequenceFile() throws FileNotFoundException {

@DataProvider(name="comparative")
public Object[][] provideOriginalAndNewReaders() throws FileNotFoundException {
GZIIndex gziIndex;
try {
gziIndex = GZIIndex.loadIndex(SEQUENCE_FILE_GZI.toPath());
} catch (IOException e) {
throw new RuntimeIOException(e);
}
return new Object[][] {
new Object[] { ReferenceSequenceFileFactory.getReferenceSequenceFile(SEQUENCE_FILE),
new IndexedFastaSequenceFile(SEQUENCE_FILE) },
Expand All @@ -83,6 +93,11 @@ public Object[][] provideOriginalAndNewReaders() throws FileNotFoundException {
SEQUENCE_FILE_BGZ, true),
new BlockCompressedIndexedFastaSequenceFile(
SEQUENCE_FILE_BGZ.toPath()) },
new Object[] { ReferenceSequenceFileFactory.getReferenceSequenceFile(SEQUENCE_FILE_BGZ),
new BlockCompressedIndexedFastaSequenceFile(
SEQUENCE_FILE_BGZ.getAbsolutePath(), new SeekableFileStream(SEQUENCE_FILE_BGZ),
new FastaSequenceIndex(new FileInputStream(SEQUENCE_FILE_INDEX)), null,
gziIndex) },
new Object[] { ReferenceSequenceFileFactory.getReferenceSequenceFile(SEQUENCE_FILE.getAbsolutePath(),
new SeekableFileStream(SEQUENCE_FILE), new FastaSequenceIndex(new FileInputStream(SEQUENCE_FILE_INDEX))),
new IndexedFastaSequenceFile(SEQUENCE_FILE.getAbsolutePath(), new SeekableFileStream(SEQUENCE_FILE),
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/htsjdk/samtools/util/GZIIndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.testng.annotations.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -55,6 +57,14 @@ public void testLoadIndex(final File indexFile, final int expectedBlocks) throws
Assert.assertEquals(index.getNumberOfBlocks(), expectedBlocks);
}

@Test(dataProvider = "indexFiles")
public void testLoadIndexFromStream(final File indexFile, final int expectedBlocks) throws Exception {
try (InputStream in = new FileInputStream(indexFile)) {
final GZIIndex index = GZIIndex.loadIndex(indexFile.toString(), in);
Assert.assertEquals(index.getNumberOfBlocks(), expectedBlocks);
}
}

@Test(dataProvider = "indexFiles")
public void testWriteIndex(final File indexFile, final int exprectedBlocks) throws Exception {
// load the index and write it down
Expand Down

0 comments on commit 2d2922f

Please sign in to comment.