diff --git a/package.json b/package.json index a32cd13d41..216fc76085 100644 --- a/package.json +++ b/package.json @@ -99,7 +99,7 @@ "ipfs-bitswap": "^0.26.0", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.16.0", - "ipfs-http-client": "^38.1.0", + "ipfs-http-client": "^38.2.0", "ipfs-http-response": "~0.3.1", "ipfs-mfs": "^0.13.0", "ipfs-multipart": "^0.2.0", @@ -204,7 +204,7 @@ "form-data": "^2.5.1", "hat": "0.0.3", "interface-ipfs-core": "^0.117.2", - "ipfs-interop": "~0.1.0", + "ipfs-interop": "^0.1.1", "ipfsd-ctl": "^0.47.2", "libp2p-websocket-star": "~0.10.2", "ncp": "^2.0.0", diff --git a/src/cli/commands/block/rm.js b/src/cli/commands/block/rm.js index e7640ec83d..1f92ed1a06 100644 --- a/src/cli/commands/block/rm.js +++ b/src/cli/commands/block/rm.js @@ -1,22 +1,46 @@ 'use strict' module.exports = { - command: 'rm ', + command: 'rm ', - describe: 'Remove a raw IPFS block', + describe: 'Remove IPFS block(s)', - builder: {}, + builder: { + force: { + alias: 'f', + describe: 'Ignore nonexistent blocks', + type: 'boolean', + default: false + }, + quiet: { + alias: 'q', + describe: 'Write minimal output', + type: 'boolean', + default: false + } + }, - handler ({ getIpfs, print, isDaemonOn, key, resolve }) { + handler ({ getIpfs, print, hash, force, quiet, resolve }) { resolve((async () => { - if (isDaemonOn()) { - // TODO implement this once `js-ipfs-http-client` supports it - throw new Error('rm block with daemon running is not yet implemented') + const ipfs = await getIpfs() + let errored = false + + for await (const result of ipfs.block._rmAsyncIterator(hash, { + force, + quiet + })) { + if (result.error) { + errored = true + } + + if (!quiet) { + print(result.error || 'removed ' + result.hash) + } } - const ipfs = await getIpfs() - await ipfs.block.rm(key) - print('removed ' + key) + if (errored && !quiet) { + throw new Error('some blocks not removed') + } })()) } } diff --git a/src/core/components/block.js b/src/core/components/block.js index 205dcedcdc..87b00fc52c 100644 --- a/src/core/components/block.js +++ b/src/core/components/block.js @@ -5,17 +5,67 @@ const multihashing = require('multihashing-async') const CID = require('cids') const callbackify = require('callbackify') const errCode = require('err-code') +const all = require('async-iterator-all') +const { PinTypes } = require('./pin/pin-manager') module.exports = function block (self) { - return { - get: callbackify.variadic(async (cid, options) => { // eslint-disable-line require-await - options = options || {} + async function * rmAsyncIterator (cids, options) { + options = options || {} - try { + if (!Array.isArray(cids)) { + cids = [cids] + } + + // We need to take a write lock here to ensure that adding and removing + // blocks are exclusive operations + const release = await self._gcLock.writeLock() + + try { + for (let cid of cids) { cid = cleanCid(cid) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') + + const result = { + hash: cid.toString() + } + + try { + const pinResult = await self.pin.pinManager.isPinnedWithType(cid, PinTypes.all) + + if (pinResult.pinned) { + if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth + throw errCode(new Error(`pinned via ${pinResult.reason}`)) + } + + throw errCode(new Error(`pinned: ${pinResult.reason}`)) + } + + // remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged + const has = await self._blockService._repo.blocks.has(cid) + + if (!has) { + throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') + } + + await self._blockService.delete(cid) + } catch (err) { + if (!options.force) { + result.error = `cannot remove ${cid}: ${err.message}` + } + } + + if (!options.quiet) { + yield result + } } + } finally { + release() + } + } + + return { + get: callbackify.variadic(async (cid, options) => { // eslint-disable-line require-await + options = options || {} + cid = cleanCid(cid) if (options.preload !== false) { self._preload(cid) @@ -66,31 +116,13 @@ module.exports = function block (self) { release() } }), - rm: callbackify(async (cid) => { - try { - cid = cleanCid(cid) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') - } - - // We need to take a write lock here to ensure that adding and removing - // blocks are exclusive operations - const release = await self._gcLock.writeLock() - - try { - await self._blockService.delete(cid) - } finally { - release() - } + rm: callbackify.variadic(async (cids, options) => { // eslint-disable-line require-await + return all(rmAsyncIterator(cids, options)) }), + _rmAsyncIterator: rmAsyncIterator, stat: callbackify.variadic(async (cid, options) => { options = options || {} - - try { - cid = cleanCid(cid) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') - } + cid = cleanCid(cid) if (options.preload !== false) { self._preload(cid) @@ -112,5 +144,9 @@ function cleanCid (cid) { } // CID constructor knows how to do the cleaning :) - return new CID(cid) + try { + return new CID(cid) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } } diff --git a/src/http/api/resources/block.js b/src/http/api/resources/block.js index d72f4b78bc..c88b25b15f 100644 --- a/src/http/api/resources/block.js +++ b/src/http/api/resources/block.js @@ -8,6 +8,7 @@ const Boom = require('@hapi/boom') const { cidToString } = require('../../../utils/cid') const debug = require('debug') const all = require('async-iterator-all') +const streamResponse = require('../../utils/stream-response') const log = debug('ipfs:http-api:block') log.error = debug('ipfs:http-api:block:error') @@ -102,20 +103,48 @@ exports.put = { } exports.rm = { - // uses common parseKey method that returns a `key` - parseArgs: exports.parseKey, + validate: { + query: Joi.object().keys({ + arg: Joi.array().items(Joi.string()).single().required(), + force: Joi.boolean().default(false), + quiet: Joi.boolean().default(false) + }).unknown() + }, - // main route handler which is called after the above `parseArgs`, but only if the args were valid - async handler (request, h) { - const { key } = request.pre.args + parseArgs: (request, h) => { + let { arg } = request.query try { - await request.server.app.ipfs.block.rm(key) + arg = arg.map(thing => new CID(thing)) } catch (err) { - throw Boom.boomify(err, { message: 'Failed to delete block' }) + throw Boom.badRequest('Not a valid hash') + } + + return { + ...request.query, + arg } + }, - return h.response() + // main route handler which is called after the above `parseArgs`, but only if the args were valid + handler (request, h) { + const { arg, force, quiet } = request.pre.args + + return streamResponse(request, h, async (output) => { + try { + for await (const result of request.server.app.ipfs.block._rmAsyncIterator(arg, { + force, + quiet + })) { + output.write(JSON.stringify({ + Hash: result.hash, + Error: result.error + }) + '\n') + } + } catch (err) { + throw Boom.boomify(err, { message: 'Failed to delete block' }) + } + }) } } diff --git a/src/http/api/routes/block.js b/src/http/api/routes/block.js index 1e25b98543..ae45ce8bed 100644 --- a/src/http/api/routes/block.js +++ b/src/http/api/routes/block.js @@ -31,10 +31,11 @@ module.exports = [ { method: '*', path: '/api/v0/block/rm', - config: { + options: { pre: [ { method: resources.block.rm.parseArgs, assign: 'args' } - ] + ], + validate: resources.block.rm.validate }, handler: resources.block.rm.handler }, diff --git a/src/http/utils/stream-response.js b/src/http/utils/stream-response.js new file mode 100644 index 0000000000..a029daaaa9 --- /dev/null +++ b/src/http/utils/stream-response.js @@ -0,0 +1,29 @@ +'use strict' + +const { PassThrough } = require('readable-stream') + +function streamResponse (request, h, fn) { + const output = new PassThrough() + const errorTrailer = 'X-Stream-Error' + + Promise.resolve() + .then(() => fn(output)) + .catch(err => { + request.raw.res.addTrailers({ + [errorTrailer]: JSON.stringify({ + Message: err.message, + Code: 0 + }) + }) + }) + .finally(() => { + output.end() + }) + + return h.response(output) + .header('x-chunked-output', '1') + .header('content-type', 'application/json') + .header('Trailer', errorTrailer) +} + +module.exports = streamResponse diff --git a/test/cli/block.js b/test/cli/block.js index ee0d47b86f..3304c8fcb9 100644 --- a/test/cli/block.js +++ b/test/cli/block.js @@ -72,10 +72,36 @@ describe('block', () => runOnAndOff((thing) => { ].join('\n') + '\n') }) - it.skip('rm', async function () { + it('rm', async function () { this.timeout(40 * 1000) + await ipfs('block put test/fixtures/test-data/hello') + const out = await ipfs('block rm QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp') expect(out).to.eql('removed QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp\n') }) + + it('rm quietly', async function () { + this.timeout(40 * 1000) + + await ipfs('block put test/fixtures/test-data/hello') + + const out = await ipfs('block rm --quiet QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kp') + expect(out).to.eql('') + }) + + it('rm force', async function () { + this.timeout(40 * 1000) + + const out = await ipfs('block rm --force QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kh') + expect(out).to.eql('removed QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kh\n') + }) + + it('fails to remove non-existent block', async function () { + this.timeout(40 * 1000) + + const out = await ipfs.fail('block rm QmZjTnYw2TFhn9Nn7tjmPSoTBoY7YRkwPzwSrSbabY24Kh') + expect(out.stdout).to.include('block not found') + expect(out.stdout).to.include('some blocks not removed') + }) })) diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index 5ac674782c..0ffe937ab1 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -12,12 +12,7 @@ describe('interface-ipfs-core tests', function () { tests.bitswap(defaultCommonFactory, { skip: !isNode }) - tests.block(defaultCommonFactory, { - skip: [{ - name: 'rm', - reason: 'Not implemented' - }] - }) + tests.block(defaultCommonFactory) tests.bootstrap(defaultCommonFactory) diff --git a/test/http-api/interface.js b/test/http-api/interface.js index 7469be1564..c73a5d0c6e 100644 --- a/test/http-api/interface.js +++ b/test/http-api/interface.js @@ -12,12 +12,7 @@ describe('interface-ipfs-core over ipfs-http-client tests', () => { tests.bitswap(defaultCommonFactory) - tests.block(defaultCommonFactory, { - skip: [{ - name: 'rm', - reason: 'Not implemented' - }] - }) + tests.block(defaultCommonFactory) tests.bootstrap(defaultCommonFactory)