Skip to content

Commit

Permalink
Merge pull request #60 from GMOD/htsget
Browse files Browse the repository at this point in the history
Htsget data fetching
  • Loading branch information
cmdcolin authored Aug 19, 2020
2 parents bb7cd26 + d0b943e commit 42992fb
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 87 deletions.
1 change: 1 addition & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"curly": "error",
"@typescript-eslint/no-explicit-any": 0,
"@typescript-eslint/explicit-function-return-type": 0,
"@typescript-eslint/ban-ts-ignore": 0,
"semi": [
"error",
"never"
Expand Down
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
"dependencies": {
"@babel/runtime-corejs3": "^7.5.5",
"@gmod/bgzf-filehandle": "^1.3.3",
"@types/long": "^4.0.0",
"@types/node": "^12.7.8",
"abortable-promise-cache": "^1.2.0",
"buffer-crc32": "^0.2.13",
"cross-fetch": "^3.0.2",
"es6-promisify": "^6.0.1",
"generic-filehandle": "^2.0.0",
"long": "^4.0.0",
"object.entries-ponyfill": "^1.0.1",
"quick-lru": "^2.0.0"
"quick-lru": "^2.0.0",
"range-parser": "^1.2.1"
},
"devDependencies": {
"@babel/cli": "^7.2.3",
Expand All @@ -58,6 +58,10 @@
"@babel/plugin-transform-runtime": "^7.2.0",
"@babel/preset-env": "^7.3.1",
"@babel/preset-typescript": "^7.6.0",
"@types/buffer-crc32": "^0.2.0",
"@types/long": "^4.0.0",
"@types/node": "^12.7.8",
"@types/range-parser": "^1.2.3",
"@typescript-eslint/eslint-plugin": "^2.3.1",
"@typescript-eslint/parser": "^2.3.1",
"babel-jest": "^24.1.0",
Expand Down
22 changes: 11 additions & 11 deletions src/bai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import VirtualOffset, { fromBytes } from './virtualOffset'
import Chunk from './chunk'

import IndexFile from './indexFile'
import { longToNumber, abortBreakPoint, canMergeBlocks } from './util'
import { longToNumber, abortBreakPoint, canMergeBlocks, BaseOpts } from './util'

const BAI_MAGIC = 21578050 // BAI\1

Expand All @@ -22,8 +22,9 @@ export default class BAI extends IndexFile {
return { lineCount }
}

async lineCount(refId: number) {
const index = (await this.parse()).indices[refId]
async lineCount(refId: number, opts: BaseOpts = {}) {
const prom = await this.parse(opts)
const index = prom.indices[refId]
if (!index) {
return -1
}
Expand All @@ -32,11 +33,9 @@ export default class BAI extends IndexFile {
}

// fetch and parse the index
async _parse(abortSignal?: AbortSignal) {
async _parse(opts: BaseOpts = {}) {
const data: { [key: string]: any } = { bai: true, maxBlockSize: 1 << 16 }
const bytes = (await this.filehandle.readFile({
signal: abortSignal,
})) as Buffer
const bytes = (await this.filehandle.readFile(opts)) as Buffer

// check BAI magic numbers
if (bytes.readUInt32LE(0) !== BAI_MAGIC) {
Expand All @@ -51,7 +50,7 @@ export default class BAI extends IndexFile {
data.indices = new Array(data.refCount)
let currOffset = 8
for (let i = 0; i < data.refCount; i += 1) {
await abortBreakPoint(abortSignal)
await abortBreakPoint(opts.signal)

// the binning index
const binCount = bytes.readInt32LE(currOffset)
Expand Down Expand Up @@ -105,10 +104,11 @@ export default class BAI extends IndexFile {
seqId: number,
start?: number,
end?: number,
opts: BaseOpts = {},
): Promise<{ start: number; end: number; score: number }[]> {
const v = 16384
const range = start !== undefined
const indexData = await this.parse()
const indexData = await this.parse(opts)
const seqIdx = indexData.indices[seqId]
if (!seqIdx) {
return []
Expand Down Expand Up @@ -168,12 +168,12 @@ export default class BAI extends IndexFile {
return list
}

async blocksForRange(refId: number, min: number, max: number) {
async blocksForRange(refId: number, min: number, max: number, opts: BaseOpts = {}) {
if (min < 0) {
min = 0
}

const indexData = await this.parse()
const indexData = await this.parse(opts)
if (!indexData) {
return []
}
Expand Down
78 changes: 37 additions & 41 deletions src/bamFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import AbortablePromiseCache from 'abortable-promise-cache'
import BAI from './bai'
import CSI from './csi'
import Chunk from './chunk'
import crc32 from 'buffer-crc32'

import { unzip, unzipChunkSlice } from '@gmod/bgzf-filehandle'

Expand All @@ -11,30 +12,22 @@ import { LocalFile, RemoteFile, GenericFilehandle } from 'generic-filehandle'
import BAMFeature from './record'
import IndexFile from './indexFile'
import { parseHeaderText } from './sam'
import { abortBreakPoint, checkAbortSignal, timeout } from './util'
import { abortBreakPoint, checkAbortSignal, timeout, makeOpts, BamOpts, BaseOpts } from './util'

const BAM_MAGIC = 21840194
export const BAM_MAGIC = 21840194

const blockLen = 1 << 16
type G = GenericFilehandle

interface BamOpts {
viewAsPairs?: boolean
pairAcrossChr?: boolean
maxInsertSize?: number
signal?: AbortSignal
}

export default class BamFile {
private renameRefSeq: (a: string) => string
private bam: GenericFilehandle
private index: IndexFile
private featureCache: any
private chunkSizeLimit: number
private fetchSizeLimit: number
private header: any
private chrToIndex: any
private indexToChr: any
protected featureCache: any
protected chrToIndex: any
protected indexToChr: any

/**
* @param {object} args
Expand Down Expand Up @@ -112,14 +105,14 @@ export default class BamFile {
this.chunkSizeLimit = chunkSizeLimit || 300000000 // 300MB
}

async getHeader(abortSignal?: AbortSignal) {
const indexData = await this.index.parse(abortSignal)
async getHeader(origOpts: AbortSignal | BaseOpts = {}) {
const opts = makeOpts(origOpts)
const indexData = await this.index.parse(opts)
const ret = indexData.firstDataLine ? indexData.firstDataLine.blockPosition + 65535 : undefined
let buffer
if (ret) {
const res = await this.bam.read(Buffer.alloc(ret + blockLen), 0, ret + blockLen, 0, {
signal: abortSignal,
})
const res = await this.bam.read(Buffer.alloc(ret + blockLen), 0, ret + blockLen, 0, opts)

const { bytesRead } = res
;({ buffer } = res)
if (!bytesRead) {
Expand All @@ -131,7 +124,7 @@ export default class BamFile {
buffer = buffer.slice(0, ret)
}
} else {
buffer = (await this.bam.readFile({ signal: abortSignal })) as Buffer
buffer = (await this.bam.readFile(opts)) as Buffer
}

const uncba = await unzip(buffer)
Expand All @@ -142,7 +135,7 @@ export default class BamFile {
const headLen = uncba.readInt32LE(4)

this.header = uncba.toString('utf8', 8, 8 + headLen)
const { chrToIndex, indexToChr } = await this._readRefSeqs(headLen + 8, 65535, abortSignal)
const { chrToIndex, indexToChr } = await this._readRefSeqs(headLen + 8, 65535, opts)
this.chrToIndex = chrToIndex
this.indexToChr = indexToChr

Expand All @@ -154,17 +147,15 @@ export default class BamFile {
async _readRefSeqs(
start: number,
refSeqBytes: number,
abortSignal?: AbortSignal,
opts: BaseOpts = {},
): Promise<{
chrToIndex: { [key: string]: number }
indexToChr: { refName: string; length: number }[]
}> {
if (start > refSeqBytes) {
return this._readRefSeqs(start, refSeqBytes * 2)
return this._readRefSeqs(start, refSeqBytes * 2, opts)
}
const res = await this.bam.read(Buffer.alloc(refSeqBytes + blockLen), 0, refSeqBytes, 0, {
signal: abortSignal,
})
const res = await this.bam.read(Buffer.alloc(refSeqBytes + blockLen), 0, refSeqBytes, 0, opts)
const { bytesRead } = res
let { buffer } = res
if (!bytesRead) {
Expand All @@ -181,7 +172,7 @@ export default class BamFile {
const chrToIndex: { [key: string]: number } = {}
const indexToChr: { refName: string; length: number }[] = []
for (let i = 0; i < nRef; i += 1) {
await abortBreakPoint(abortSignal)
await abortBreakPoint(opts.signal)
const lName = uncba.readInt32LE(p)
let refName = uncba.toString('utf8', p + 4, p + 4 + lName - 1)
refName = this.renameRefSeq(refName)
Expand All @@ -193,7 +184,7 @@ export default class BamFile {
p = p + 8 + lName
if (p > uncba.length) {
console.warn(`BAM header is very big. Re-fetching ${refSeqBytes} bytes.`)
return this._readRefSeqs(start, refSeqBytes * 2)
return this._readRefSeqs(start, refSeqBytes * 2, opts)
}
}
return { chrToIndex, indexToChr }
Expand Down Expand Up @@ -269,7 +260,7 @@ export default class BamFile {
const c = chunks[i]
const { data, cpositions, dpositions, chunk } = await this.featureCache.get(
c.toString(),
c,
{ chunk: c, opts },
opts.signal,
)
const promise = this.readBamFeatures(data, cpositions, dpositions, chunk).then(records => {
Expand Down Expand Up @@ -376,7 +367,7 @@ export default class BamFile {
const mateFeatPromises = mateChunks.map(async c => {
const { data, cpositions, dpositions, chunk } = await this.featureCache.get(
c.toString(),
c,
{ chunk: c, opts },
opts.signal,
)
const feats = await this.readBamFeatures(data, cpositions, dpositions, chunk)
Expand All @@ -398,11 +389,10 @@ export default class BamFile {
return featuresRet
}

async _readChunk(chunk: Chunk, abortSignal?: AbortSignal) {
const bufsize = chunk.fetchedSize()
const res = await this.bam.read(Buffer.alloc(bufsize), 0, bufsize, chunk.minv.blockPosition, {
signal: abortSignal,
})
async _readChunk({ chunk, opts }: { chunk: unknown; opts: BaseOpts }, abortSignal?: AbortSignal) {
const c = chunk as Chunk
const bufsize = c.fetchedSize()
const res = await this.bam.read(Buffer.alloc(bufsize), 0, bufsize, c.minv.blockPosition, opts)
const { bytesRead } = res
let { buffer } = res
checkAbortSignal(abortSignal)
Expand Down Expand Up @@ -431,8 +421,10 @@ export default class BamFile {
const blockEnd = blockStart + 4 + blockSize - 1

// increment position to the current decompressed status
while (blockStart + chunk.minv.dataPosition >= dpositions[pos++]) {}
pos--
if (dpositions) {
while (blockStart + chunk.minv.dataPosition >= dpositions[pos++]) {}
pos--
}

// only try to read the feature if we have all the bytes for it
if (blockEnd < ba.length) {
Expand All @@ -442,6 +434,9 @@ export default class BamFile {
start: blockStart,
end: blockEnd,
},
// the below results in an automatically calculated file-offset based ID
// if the info for that is available, otherwise crc32 of the features
//
// cpositions[pos] refers to actual file offset of a bgzip block boundaries
//
// we multiply by (1 <<8) in order to make sure each block has a "unique"
Expand All @@ -455,11 +450,12 @@ export default class BamFile {
// starts at 0 instead of chunk.minv.dataPosition
//
// the +1 is just to avoid any possible uniqueId 0 but this does not realistically happen
fileOffset:
cpositions[pos] * (1 << 8) +
(blockStart - dpositions[pos]) +
chunk.minv.dataPosition +
1,
fileOffset: cpositions
? cpositions[pos] * (1 << 8) +
(blockStart - dpositions[pos]) +
chunk.minv.dataPosition +
1
: crc32.signed(ba.slice(blockStart, blockEnd)),
})

sink.push(feature)
Expand Down
9 changes: 5 additions & 4 deletions src/csi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ export default class CSI extends IndexFile {
}

// fetch and parse the index
async _parse(abortSignal?: AbortSignal) {
async _parse(opts: { signal?: AbortSignal }) {
const data: { [key: string]: any } = { csi: true, maxBlockSize: 1 << 16 }
const bytes = await unzip((await this.filehandle.readFile({ signal: abortSignal })) as Buffer)
const buffer = (await this.filehandle.readFile(opts)) as Buffer
const bytes = await unzip(buffer)

// check TBI magic numbers
if (bytes.readUInt32LE(0) === CSI1_MAGIC) {
Expand All @@ -126,7 +127,7 @@ export default class CSI extends IndexFile {
data.indices = new Array(data.refCount)
let currOffset = 16 + auxLength + 4
for (let i = 0; i < data.refCount; i += 1) {
await abortBreakPoint(abortSignal)
await abortBreakPoint(opts.signal)
// the binning index
const binCount = bytes.readInt32LE(currOffset)
currOffset += 4
Expand Down Expand Up @@ -184,7 +185,7 @@ export default class CSI extends IndexFile {
beg = 0
}

const indexData = await this.parse(opts.signal)
const indexData = await this.parse(opts)
if (!indexData) {
return []
}
Expand Down
Loading

0 comments on commit 42992fb

Please sign in to comment.