Skip to content

Commit

Permalink
refactor(datastore): move to go-ipfs blockstore like interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Apr 26, 2016
1 parent c5cd93f commit 9229e57
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 119 deletions.
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
"abstract-blob-store": "^3.2.0",
"aegir": "^2.1.2",
"async": "^1.5.2",
"bl": "^1.1.2",
"bs58": "^3.0.0",
"buffer-loader": "^0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
Expand All @@ -48,6 +46,7 @@
"babel-runtime": "^6.6.1",
"bl": "^1.1.2",
"concat-stream": "^1.5.1",
"ipfs-block": "^0.2.0",
"level-js": "^2.2.3",
"lock": "^0.1.2",
"lockfile": "^1.0.1",
Expand All @@ -64,4 +63,4 @@
"dignifiedquire <[email protected]>",
"greenkeeperio-bot <[email protected]>"
]
}
}
65 changes: 46 additions & 19 deletions src/stores/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const Lock = require('lock')
const stream = require('stream')
const bl = require('bl')
const Block = require('ipfs-block')

const PREFIX_LENGTH = 8

Expand All @@ -20,47 +22,72 @@ exports.setUp = (basePath, blobStore, locks) => {
const store = blobStore(basePath + '/blocks')
const lock = new Lock()

return {
createReadStream: (multihash, extension) => {
const path = multihashToPath(multihash, extension)
return store.createReadStream(path)
},
const createReadStream = (multihash, extension) => {
const path = multihashToPath(multihash, extension)
return store.createReadStream(path)
}

const createWriteStream = (multihash, extension, cb) => {
const path = multihashToPath(multihash, extension)
const through = stream.PassThrough()

lock(path, (release) => {
const ws = store.createWriteStream(path, release(cb))
through.pipe(ws)
})

return through
}

createWriteStream: (multihash, extension, cb) => {
return {
get: (key, extension, cb) => {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

const path = multihashToPath(multihash, extension)
const through = stream.PassThrough()
if (!key) {
return cb(new Error('Invalid key'))
}

createReadStream(key, 'data')
.pipe(bl((err, data) => {
if (err) {
return cb(err)
}

lock(path, (release) => {
const ws = store.createWriteStream(path, release(cb))
through.pipe(ws)
})
cb(null, new Block(data, extension))
}))
},

put: (block, cb) => {
if (!block || !block.data) {
return cb(new Error('Invalid block'))
}

return through
const ws = createWriteStream(block.key, block.extension, cb)
ws.write(block.data)
ws.end()
},

exists: (multihash, extension, cb) => {
has: (key, extension, cb) => {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

const path = multihashToPath(multihash, extension)
return store.exists(path, cb)
const path = multihashToPath(key, extension)
store.exists(path, cb)
},

remove: (multihash, extension, cb) => {
delete: (key, extension, cb) => {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

const path = multihashToPath(multihash, extension)
return store.remove(path, cb)
const path = multihashToPath(key, extension)
store.remove(path, cb)
}
}
}
207 changes: 110 additions & 97 deletions test/repo-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@

const Repo = require('../src/index')
const expect = require('chai').expect
const base58 = require('bs58')
const bl = require('bl')
const fs = require('fs')
const join = require('path').join

const fileA = fs.readFileSync(join(__dirname, 'test-repo/blocks/12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07606bfdb812303d.data'))

const fileAExt = fs.readFileSync(join(__dirname, 'test-repo/blocks/12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07606bfdb812303d.ext'))
const Block = require('ipfs-block')

module.exports = function (repo) {
describe('IPFS Repo Tests', function () {
Expand Down Expand Up @@ -138,127 +131,147 @@ module.exports = function (repo) {
})

describe('datastore', function () {
const baseFileHash = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVzxTt3qVe'
const baseExtFileHash = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVzxTt3qVe'
const helloKey = '1220b94d/1220b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9.data'
const helloIpldKey = '1220b94d/1220b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9.ipld'

it('reads block', function (done) {
const buf = new Buffer(base58.decode(baseFileHash))
repo.datastore.createReadStream(buf)
.pipe(bl((err, data) => {
describe('.put', () => {
it('simple', function (done) {
const b = new Block('hello world')
repo.datastore.put(b, (err, meta) => {
expect(err).to.not.exist
expect(data.equals(fileA)).to.equal(true)
expect(meta.key).to.be.eql(helloKey)
done()
}))
})
})
})

it('reads block, with custom extension', function (done) {
const buf = new Buffer(base58.decode(baseFileHash))
repo.datastore.createReadStream(buf, 'ext')
.pipe(bl((err, data) => {
expect(err).to.not.exist
expect(data.equals(fileAExt)).to.equal(true)
done()
}))
})
it('multi write (locks)', (done) => {
const b = new Block('hello world')

it('write a block', function (done) {
const rnd = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVtesthash'
const mh = new Buffer(base58.decode(rnd))
const data = new Buffer('Oh the data')
let i = 0
const finish = () => {
i++
if (i === 2) done()
}

repo.datastore.createWriteStream(mh, (err, metadata) => {
expect(err).to.not.exist
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
done()
}).end(data)
})
repo.datastore.put(b, (err, meta) => {
expect(err).to.not.exist
expect(meta.key).to.equal(helloKey)
finish()
})

it('write a block with custom extension', function (done) {
const rnd = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVtesthash'
const mh = new Buffer(base58.decode(rnd))
const data = new Buffer('Oh the data')
repo.datastore.put(b, (err, meta) => {
expect(err).to.not.exist
expect(meta.key).to.equal(helloKey)
finish()
})
})

repo.datastore.createWriteStream(mh, 'ext', (err, metadata) => {
expect(err).to.not.exist
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.ext')
done()
}).end(data)
it('custom extension', function (done) {
const b = new Block('hello world', 'ipld')
repo.datastore.put(b, (err, meta) => {
expect(err).to.not.exist
expect(meta.key).to.be.eql(helloIpldKey)
done()
})
})

it('returns an error on invalid block', (done) => {
repo.datastore.put('hello', (err) => {
expect(err.message).to.be.eql('Invalid block')
done()
})
})
})

it('write locks', (done) => {
const rnd = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVtesthash'
const mh = new Buffer(base58.decode(rnd))
const data = new Buffer('Oh the data')
describe('.get', () => {
it('simple', (done) => {
const b = new Block('hello world')

let i = 0
const finish = () => {
i++
if (i === 2) done()
}
repo.datastore.get(b.key, (err, data) => {
expect(err).to.not.exist
expect(data).to.be.eql(b)

repo.datastore.createWriteStream(mh, (err, metadata) => {
expect(err).to.not.exist
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
finish()
}).end(data)
done()
})
})

repo.datastore.createWriteStream(mh, (err, metadata) => {
expect(err).to.not.exist
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
finish()
}).end(data)
})
it('custom extension', (done) => {
const b = new Block('hello world', 'ipld')

it('block exists', function (done) {
const buf = new Buffer(base58.decode(baseFileHash))
repo.datastore.get(b.key, b.extension, (err, data) => {
expect(err).to.not.exist
expect(data).to.be.eql(b)

repo.datastore.exists(buf, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(true)
done()
done()
})
})

it('returns an error on invalid block', (done) => {
repo.datastore.get(null, (err) => {
expect(err.message).to.be.eql('Invalid key')
done()
})
})
})

it('block exists, with custom extension', function (done) {
const buf = new Buffer(base58.decode(baseExtFileHash))
describe('.has', () => {
it('existing block', (done) => {
const b = new Block('hello world')

repo.datastore.exists(buf, 'ext', (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(true)
done()
repo.datastore.has(b.key, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(true)
done()
})
})
})

it('check for non existent block', function (done) {
const buf = new Buffer('random buffer')
it('with extension', (done) => {
const b = new Block('hello world')

repo.datastore.exists(buf, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
repo.datastore.has(b.key, 'data', (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(true)
done()
})
})
})

it('remove a block', function (done) {
const buf = new Buffer(base58.decode(baseFileHash))
repo.datastore.remove(buf, (err) => {
expect(err).to.not.exist
repo.datastore.exists(buf, (err, exists) => {
it('non existent block', (done) => {
const b = new Block('wooot')

repo.datastore.has(b.key, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
})
})
})

it('remove a block, with custom extension', function (done) {
const buf = new Buffer(base58.decode(baseExtFileHash))
repo.datastore.remove(buf, 'ext', (err) => {
expect(err).to.not.exist
repo.datastore.exists(buf, 'ext', (err, exists) => {
describe('.delete', () => {
it('simple', (done) => {
const b = new Block('hello world')

repo.datastore.delete(b.key, (err) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()

repo.datastore.has(b.key, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
})
})
})

it('custom extension', (done) => {
const b = new Block('hello world', 'ipld')

repo.datastore.delete(b.key, b.extension, (err) => {
expect(err).to.not.exist

repo.datastore.has(b.key, b.extension, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
})
})
})
})
Expand Down

0 comments on commit 9229e57

Please sign in to comment.