diff --git a/doc/API.md b/doc/API.md index 96fc9fc62b..fd2cd82452 100644 --- a/doc/API.md +++ b/doc/API.md @@ -12,6 +12,9 @@ * [`handle`](#handle) * [`unhandle`](#unhandle) * [`ping`](#ping) + * [`fetch`](#fetch) + * [`fetchService.registerLookupFunction`](#fetchserviceregisterlookupfunction) + * [`fetchService.unRegisterLookupFunction`](#fetchserviceunregisterlookupfunction) * [`multiaddrs`](#multiaddrs) * [`addressManager.getListenAddrs`](#addressmanagergetlistenaddrs) * [`addressManager.getAnnounceAddrs`](#addressmanagergetannounceaddrs) @@ -455,6 +458,72 @@ Pings a given peer and get the operation's latency. const latency = await libp2p.ping(otherPeerId) ``` +## fetch + +Fetch a value from a remote node + +`libp2p.fetch(peer, key)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peer | [`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | peer to ping | +| key | `string` | A key that corresponds to a value on the remote node | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | The value for the key or null if it cannot be found | + +#### Example + +```js +// ... +const value = await libp2p.fetch(otherPeerId, '/some/key') +``` + +## fetchService.registerLookupFunction + +Register a function to look up values requested by remote nodes + +`libp2p.fetchService.registerLookupFunction(prefix, lookup)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| prefix | `string` | All queries below this prefix will be passed to the lookup function | +| lookup | `(key: string) => Promise` | A function that takes a key and returns a Uint8Array or null | + +#### Example + +```js +// ... +const value = await libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... }) +``` + +## fetchService.unregisterLookupFunction + +Removes the passed lookup function or any function registered for the passed prefix + +`libp2p.fetchService.unregisterLookupFunction(prefix, lookup)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| prefix | `string` | All queries below this prefix will be passed to the lookup function | +| lookup | `(key: string) => Promise` | Optional: A function that takes a key and returns a Uint8Array or null | + +#### Example + +```js +// ... +libp2p.fetchService.unregisterLookupFunction('/prefix') +``` + ## multiaddrs Gets the multiaddrs the libp2p node announces to the network. This computes the advertising multiaddrs diff --git a/package.json b/package.json index c78c349c67..357ddd9213 100644 --- a/package.json +++ b/package.json @@ -20,15 +20,17 @@ "scripts": { "lint": "aegir lint", "build": "aegir build", - "build:proto": "npm run build:proto:circuit && npm run build:proto:identify && npm run build:proto:plaintext && npm run build:proto:address-book && npm run build:proto:proto-book && npm run build:proto:peer && npm run build:proto:peer-record && npm run build:proto:envelope", + "build:proto": "npm run build:proto:circuit && npm run build:proto:fetch && npm run build:proto:identify && npm run build:proto:plaintext && npm run build:proto:address-book && npm run build:proto:proto-book && npm run build:proto:peer && npm run build:proto:peer-record && npm run build:proto:envelope", "build:proto:circuit": "pbjs -t static-module -w commonjs -r libp2p-circuit --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/protocol/index.js ./src/circuit/protocol/index.proto", + "build:proto:fetch": "pbjs -t static-module -w commonjs -r libp2p-fetch --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/fetch/proto.js ./src/fetch/proto.proto", "build:proto:identify": "pbjs -t static-module -w commonjs -r libp2p-identify --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/identify/message.js ./src/identify/message.proto", "build:proto:plaintext": "pbjs -t static-module -w commonjs -r libp2p-plaintext --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/insecure/proto.js ./src/insecure/proto.proto", "build:proto:peer": "pbjs -t static-module -w commonjs -r libp2p-peer --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/peer-store/pb/peer.js ./src/peer-store/pb/peer.proto", "build:proto:peer-record": "pbjs -t static-module -w commonjs -r libp2p-peer-record --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/peer-record/peer-record.js ./src/record/peer-record/peer-record.proto", "build:proto:envelope": "pbjs -t static-module -w commonjs -r libp2p-envelope --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/envelope/envelope.js ./src/record/envelope/envelope.proto", - "build:proto-types": "npm run build:proto-types:circuit && npm run build:proto-types:identify && npm run build:proto-types:plaintext && npm run build:proto-types:address-book && npm run build:proto-types:proto-book && npm run build:proto-types:peer && npm run build:proto-types:peer-record && npm run build:proto-types:envelope", + "build:proto-types": "npm run build:proto-types:circuit && npm run build:proto-types:fetch && npm run build:proto-types:identify && npm run build:proto-types:plaintext && npm run build:proto-types:address-book && npm run build:proto-types:proto-book && npm run build:proto-types:peer && npm run build:proto-types:peer-record && npm run build:proto-types:envelope", "build:proto-types:circuit": "pbts -o src/circuit/protocol/index.d.ts src/circuit/protocol/index.js", + "build:proto-types:fetch": "pbts -o src/fetch/proto.d.ts src/fetch/proto.js", "build:proto-types:identify": "pbts -o src/identify/message.d.ts src/identify/message.js", "build:proto-types:plaintext": "pbts -o src/insecure/proto.d.ts src/insecure/proto.js", "build:proto-types:peer": "pbts -o src/peer-store/pb/peer.d.ts src/peer-store/pb/peer.js", diff --git a/src/fetch/README.md b/src/fetch/README.md new file mode 100644 index 0000000000..7ea9997a5e --- /dev/null +++ b/src/fetch/README.md @@ -0,0 +1,36 @@ +libp2p-fetch JavaScript Implementation +===================================== + +> Libp2p fetch protocol JavaScript implementation + +## Overview + +An implementation of the Fetch protocol as described here: https://github.com/libp2p/specs/tree/master/fetch + +The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer. + +## Usage + +```javascript +const Libp2p = require('libp2p') + +/** + * Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. + * All keys must be prefixed my the same prefix, which will be used to find the appropriate key + * lookup function. + * @param key - a string + * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't + * have a corresponding value. + */ +async function my_subsystem_key_lookup(key) { + // app specific callback to lookup key-value pairs. +} + +// Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' +const libp2p = Libp2p.create(...) +libp2p.fetchService.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) + +const key = '/my_subsystem_key_prefix/{...}' +const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance +const value = await libp2p.fetch(peerDst, key) +``` diff --git a/src/fetch/constants.js b/src/fetch/constants.js new file mode 100644 index 0000000000..2c1044e535 --- /dev/null +++ b/src/fetch/constants.js @@ -0,0 +1,6 @@ +'use strict' + +module.exports = { + // https://github.com/libp2p/specs/tree/master/fetch#wire-protocol + PROTOCOL: '/libp2p/fetch/0.0.1' +} diff --git a/src/fetch/index.js b/src/fetch/index.js new file mode 100644 index 0000000000..ef8c37f153 --- /dev/null +++ b/src/fetch/index.js @@ -0,0 +1,159 @@ +'use strict' + +const debug = require('debug') +const log = Object.assign(debug('libp2p:fetch'), { + error: debug('libp2p:fetch:err') +}) +const errCode = require('err-code') +const { codes } = require('../errors') +const lp = require('it-length-prefixed') +const { FetchRequest, FetchResponse } = require('./proto') +// @ts-ignore it-handshake does not export types +const handshake = require('it-handshake') +const { PROTOCOL } = require('./constants') + +/** + * @typedef {import('../')} Libp2p + * @typedef {import('multiaddr').Multiaddr} Multiaddr + * @typedef {import('peer-id')} PeerId + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {(key: string) => Promise} LookupFunction + */ + +/** + * A simple libp2p protocol for requesting a value corresponding to a key from a peer. + * Developers can register one or more lookup function for retrieving the value corresponding to + * a given key. Each lookup function must act on a distinct part of the overall key space, defined + * by a fixed prefix that all keys that should be routed to that lookup function will start with. + */ +class FetchProtocol { + /** + * @param {Libp2p} libp2p + */ + constructor (libp2p) { + this._lookupFunctions = new Map() // Maps key prefix to value lookup function + this._libp2p = libp2p + this.handleMessage = this.handleMessage.bind(this) + } + + /** + * Sends a request to fetch the value associated with the given key from the given peer. + * + * @param {PeerId|Multiaddr} peer + * @param {string} key + * @returns {Promise} + */ + async fetch (peer, key) { + // @ts-ignore multiaddr might not have toB58String + log('dialing %s to %s', this._protocol, peer.toB58String ? peer.toB58String() : peer) + + const connection = await this._libp2p.dial(peer) + const { stream } = await connection.newStream(FetchProtocol.PROTOCOL) + const shake = handshake(stream) + + // send message + const request = new FetchRequest({ identifier: key }) + shake.write(lp.encode.single(FetchRequest.encode(request).finish())) + + // read response + const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + switch (response.status) { + case (FetchResponse.StatusCode.OK): { + return response.data + } + case (FetchResponse.StatusCode.NOT_FOUND): { + return null + } + case (FetchResponse.StatusCode.ERROR): { + const errmsg = (new TextDecoder()).decode(response.data) + throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) + } + default: { + throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) + } + } + } + + /** + * Invoked when a fetch request is received. Reads the request message off the given stream and + * responds based on looking up the key in the request via the lookup callback that corresponds + * to the key's prefix. + * + * @param {object} options + * @param {MuxedStream} options.stream + * @param {string} options.protocol + */ + async handleMessage (options) { + const { stream } = options + const shake = handshake(stream) + const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + + let response + const lookup = this._getLookupFunction(request.identifier) + if (lookup) { + const data = await lookup(request.identifier) + if (data) { + response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data }) + } else { + response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) + } + } else { + const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) + response = new FetchResponse({ status: FetchResponse.StatusCode.ERROR, data: errmsg }) + } + + shake.write(lp.encode.single(FetchResponse.encode(response).finish())) + } + + /** + * Given a key, finds the appropriate function for looking up its corresponding value, based on + * the key's prefix. + * + * @param {string} key + */ + _getLookupFunction (key) { + for (const prefix of this._lookupFunctions.keys()) { + if (key.startsWith(prefix)) { + return this._lookupFunctions.get(prefix) + } + } + return null + } + + /** + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix. + * + * @param {string} prefix + * @param {LookupFunction} lookup + */ + registerLookupFunction (prefix, lookup) { + if (this._lookupFunctions.has(prefix)) { + throw errCode(new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered"), codes.ERR_KEY_ALREADY_EXISTS) + } + this._lookupFunctions.set(prefix, lookup) + } + + /** + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix. + * + * @param {string} prefix + * @param {LookupFunction} [lookup] + */ + unregisterLookupFunction (prefix, lookup) { + if (lookup != null) { + const existingLookup = this._lookupFunctions.get(prefix) + + if (existingLookup !== lookup) { + return + } + } + + this._lookupFunctions.delete(prefix) + } +} + +FetchProtocol.PROTOCOL = PROTOCOL + +exports = module.exports = FetchProtocol diff --git a/src/fetch/proto.d.ts b/src/fetch/proto.d.ts new file mode 100644 index 0000000000..bf022f516c --- /dev/null +++ b/src/fetch/proto.d.ts @@ -0,0 +1,134 @@ +import * as $protobuf from "protobufjs"; +/** Properties of a FetchRequest. */ +export interface IFetchRequest { + + /** FetchRequest identifier */ + identifier?: (string|null); +} + +/** Represents a FetchRequest. */ +export class FetchRequest implements IFetchRequest { + + /** + * Constructs a new FetchRequest. + * @param [p] Properties to set + */ + constructor(p?: IFetchRequest); + + /** FetchRequest identifier. */ + public identifier: string; + + /** + * Encodes the specified FetchRequest message. Does not implicitly {@link FetchRequest.verify|verify} messages. + * @param m FetchRequest message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: IFetchRequest, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a FetchRequest message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns FetchRequest + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): FetchRequest; + + /** + * Creates a FetchRequest message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns FetchRequest + */ + public static fromObject(d: { [k: string]: any }): FetchRequest; + + /** + * Creates a plain object from a FetchRequest message. Also converts values to other types if specified. + * @param m FetchRequest + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: FetchRequest, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this FetchRequest to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; +} + +/** Properties of a FetchResponse. */ +export interface IFetchResponse { + + /** FetchResponse status */ + status?: (FetchResponse.StatusCode|null); + + /** FetchResponse data */ + data?: (Uint8Array|null); +} + +/** Represents a FetchResponse. */ +export class FetchResponse implements IFetchResponse { + + /** + * Constructs a new FetchResponse. + * @param [p] Properties to set + */ + constructor(p?: IFetchResponse); + + /** FetchResponse status. */ + public status: FetchResponse.StatusCode; + + /** FetchResponse data. */ + public data: Uint8Array; + + /** + * Encodes the specified FetchResponse message. Does not implicitly {@link FetchResponse.verify|verify} messages. + * @param m FetchResponse message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: IFetchResponse, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a FetchResponse message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns FetchResponse + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): FetchResponse; + + /** + * Creates a FetchResponse message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns FetchResponse + */ + public static fromObject(d: { [k: string]: any }): FetchResponse; + + /** + * Creates a plain object from a FetchResponse message. Also converts values to other types if specified. + * @param m FetchResponse + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: FetchResponse, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this FetchResponse to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; +} + +export namespace FetchResponse { + + /** StatusCode enum. */ + enum StatusCode { + OK = 0, + NOT_FOUND = 1, + ERROR = 2 + } +} diff --git a/src/fetch/proto.js b/src/fetch/proto.js new file mode 100644 index 0000000000..f7de2b1dd5 --- /dev/null +++ b/src/fetch/proto.js @@ -0,0 +1,333 @@ +/*eslint-disable*/ +"use strict"; + +var $protobuf = require("protobufjs/minimal"); + +// Common aliases +var $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +var $root = $protobuf.roots["libp2p-fetch"] || ($protobuf.roots["libp2p-fetch"] = {}); + +$root.FetchRequest = (function() { + + /** + * Properties of a FetchRequest. + * @exports IFetchRequest + * @interface IFetchRequest + * @property {string|null} [identifier] FetchRequest identifier + */ + + /** + * Constructs a new FetchRequest. + * @exports FetchRequest + * @classdesc Represents a FetchRequest. + * @implements IFetchRequest + * @constructor + * @param {IFetchRequest=} [p] Properties to set + */ + function FetchRequest(p) { + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * FetchRequest identifier. + * @member {string} identifier + * @memberof FetchRequest + * @instance + */ + FetchRequest.prototype.identifier = ""; + + /** + * Encodes the specified FetchRequest message. Does not implicitly {@link FetchRequest.verify|verify} messages. + * @function encode + * @memberof FetchRequest + * @static + * @param {IFetchRequest} m FetchRequest message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + FetchRequest.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.identifier != null && Object.hasOwnProperty.call(m, "identifier")) + w.uint32(10).string(m.identifier); + return w; + }; + + /** + * Decodes a FetchRequest message from the specified reader or buffer. + * @function decode + * @memberof FetchRequest + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {FetchRequest} FetchRequest + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + FetchRequest.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.FetchRequest(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.identifier = r.string(); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a FetchRequest message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof FetchRequest + * @static + * @param {Object.} d Plain object + * @returns {FetchRequest} FetchRequest + */ + FetchRequest.fromObject = function fromObject(d) { + if (d instanceof $root.FetchRequest) + return d; + var m = new $root.FetchRequest(); + if (d.identifier != null) { + m.identifier = String(d.identifier); + } + return m; + }; + + /** + * Creates a plain object from a FetchRequest message. Also converts values to other types if specified. + * @function toObject + * @memberof FetchRequest + * @static + * @param {FetchRequest} m FetchRequest + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + FetchRequest.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.defaults) { + d.identifier = ""; + } + if (m.identifier != null && m.hasOwnProperty("identifier")) { + d.identifier = m.identifier; + } + return d; + }; + + /** + * Converts this FetchRequest to JSON. + * @function toJSON + * @memberof FetchRequest + * @instance + * @returns {Object.} JSON object + */ + FetchRequest.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + return FetchRequest; +})(); + +$root.FetchResponse = (function() { + + /** + * Properties of a FetchResponse. + * @exports IFetchResponse + * @interface IFetchResponse + * @property {FetchResponse.StatusCode|null} [status] FetchResponse status + * @property {Uint8Array|null} [data] FetchResponse data + */ + + /** + * Constructs a new FetchResponse. + * @exports FetchResponse + * @classdesc Represents a FetchResponse. + * @implements IFetchResponse + * @constructor + * @param {IFetchResponse=} [p] Properties to set + */ + function FetchResponse(p) { + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * FetchResponse status. + * @member {FetchResponse.StatusCode} status + * @memberof FetchResponse + * @instance + */ + FetchResponse.prototype.status = 0; + + /** + * FetchResponse data. + * @member {Uint8Array} data + * @memberof FetchResponse + * @instance + */ + FetchResponse.prototype.data = $util.newBuffer([]); + + /** + * Encodes the specified FetchResponse message. Does not implicitly {@link FetchResponse.verify|verify} messages. + * @function encode + * @memberof FetchResponse + * @static + * @param {IFetchResponse} m FetchResponse message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + FetchResponse.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.status != null && Object.hasOwnProperty.call(m, "status")) + w.uint32(8).int32(m.status); + if (m.data != null && Object.hasOwnProperty.call(m, "data")) + w.uint32(18).bytes(m.data); + return w; + }; + + /** + * Decodes a FetchResponse message from the specified reader or buffer. + * @function decode + * @memberof FetchResponse + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {FetchResponse} FetchResponse + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + FetchResponse.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.FetchResponse(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.status = r.int32(); + break; + case 2: + m.data = r.bytes(); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a FetchResponse message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof FetchResponse + * @static + * @param {Object.} d Plain object + * @returns {FetchResponse} FetchResponse + */ + FetchResponse.fromObject = function fromObject(d) { + if (d instanceof $root.FetchResponse) + return d; + var m = new $root.FetchResponse(); + switch (d.status) { + case "OK": + case 0: + m.status = 0; + break; + case "NOT_FOUND": + case 1: + m.status = 1; + break; + case "ERROR": + case 2: + m.status = 2; + break; + } + if (d.data != null) { + if (typeof d.data === "string") + $util.base64.decode(d.data, m.data = $util.newBuffer($util.base64.length(d.data)), 0); + else if (d.data.length) + m.data = d.data; + } + return m; + }; + + /** + * Creates a plain object from a FetchResponse message. Also converts values to other types if specified. + * @function toObject + * @memberof FetchResponse + * @static + * @param {FetchResponse} m FetchResponse + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + FetchResponse.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.defaults) { + d.status = o.enums === String ? "OK" : 0; + if (o.bytes === String) + d.data = ""; + else { + d.data = []; + if (o.bytes !== Array) + d.data = $util.newBuffer(d.data); + } + } + if (m.status != null && m.hasOwnProperty("status")) { + d.status = o.enums === String ? $root.FetchResponse.StatusCode[m.status] : m.status; + } + if (m.data != null && m.hasOwnProperty("data")) { + d.data = o.bytes === String ? $util.base64.encode(m.data, 0, m.data.length) : o.bytes === Array ? Array.prototype.slice.call(m.data) : m.data; + } + return d; + }; + + /** + * Converts this FetchResponse to JSON. + * @function toJSON + * @memberof FetchResponse + * @instance + * @returns {Object.} JSON object + */ + FetchResponse.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * StatusCode enum. + * @name FetchResponse.StatusCode + * @enum {number} + * @property {number} OK=0 OK value + * @property {number} NOT_FOUND=1 NOT_FOUND value + * @property {number} ERROR=2 ERROR value + */ + FetchResponse.StatusCode = (function() { + var valuesById = {}, values = Object.create(valuesById); + values[valuesById[0] = "OK"] = 0; + values[valuesById[1] = "NOT_FOUND"] = 1; + values[valuesById[2] = "ERROR"] = 2; + return values; + })(); + + return FetchResponse; +})(); + +module.exports = $root; diff --git a/src/fetch/proto.proto b/src/fetch/proto.proto new file mode 100644 index 0000000000..95956f1622 --- /dev/null +++ b/src/fetch/proto.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message FetchRequest { + string identifier = 1; +} + +message FetchResponse { + StatusCode status = 1; + enum StatusCode { + OK = 0; + NOT_FOUND = 1; + ERROR = 2; + } + bytes data = 2; +} diff --git a/src/index.js b/src/index.js index edb715442b..966eb77bbd 100644 --- a/src/index.js +++ b/src/index.js @@ -31,6 +31,7 @@ const PubsubAdapter = require('./pubsub-adapter') const Registrar = require('./registrar') const ping = require('./ping') const IdentifyService = require('./identify') +const FetchService = require('./fetch') const NatManager = require('./nat-manager') const { updateSelfPeerRecord } = require('./record/utils') @@ -323,6 +324,8 @@ class Libp2p extends EventEmitter { ping.mount(this) this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) + + this.fetchService = new FetchService(this) } /** @@ -356,6 +359,10 @@ class Libp2p extends EventEmitter { await this.handle(Object.values(IdentifyService.getProtocolStr(this)), this.identifyService.handleMessage) } + if (this.fetchService) { + await this.handle(FetchService.PROTOCOL, this.fetchService.handleMessage) + } + try { await this._onStarting() await this._onDidStart() @@ -407,6 +414,8 @@ class Libp2p extends EventEmitter { await this.natManager.stop() await this.transportManager.close() + this.unhandle(FetchService.PROTOCOL) + ping.unmount(this) this.dialer.destroy() } catch (/** @type {any} */ err) { @@ -559,6 +568,17 @@ class Libp2p extends EventEmitter { ) } + /** + * Sends a request to fetch the value associated with the given key from the given peer. + * + * @param {PeerId|Multiaddr} peer + * @param {string} key + * @returns {Promise} + */ + fetch (peer, key) { + return this.fetchService.fetch(peer, key) + } + /** * Pings the given peer in order to obtain the operation latency. * diff --git a/test/fetch/fetch.node.js b/test/fetch/fetch.node.js new file mode 100644 index 0000000000..da78495951 --- /dev/null +++ b/test/fetch/fetch.node.js @@ -0,0 +1,155 @@ +'use strict' +/* eslint-env mocha */ + +const { expect } = require('aegir/utils/chai') +const Libp2p = require('../../src') +const TCP = require('libp2p-tcp') +const Mplex = require('libp2p-mplex') +const { NOISE } = require('@chainsafe/libp2p-noise') +const MDNS = require('libp2p-mdns') +const { createPeerId } = require('../utils/creators/peer') +const { codes } = require('../../src/errors') +const { Multiaddr } = require('multiaddr') + +async function createLibp2pNode (peerId) { + return await Libp2p.create({ + peerId, + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + modules: { + transport: [TCP], + streamMuxer: [Mplex], + connEncryption: [NOISE], + peerDiscovery: [MDNS] + } + }) +} + +describe('Fetch', () => { + /** @type {Libp2p} */ + let sender + /** @type {Libp2p} */ + let receiver + const PREFIX_A = '/moduleA/' + const PREFIX_B = '/moduleB/' + const DATA_A = { foobar: 'hello world' } + const DATA_B = { foobar: 'goodnight moon' } + + const generateLookupFunction = function (prefix, data) { + return async function (key) { + key = key.slice(prefix.length) // strip prefix from key + const val = data[key] + if (val) { + return (new TextEncoder()).encode(val) + } + return null + } + } + + beforeEach(async () => { + const [peerIdA, peerIdB] = await createPeerId({ number: 2 }) + sender = await createLibp2pNode(peerIdA) + receiver = await createLibp2pNode(peerIdB) + + await sender.start() + await receiver.start() + + await Promise.all([ + ...sender.multiaddrs.map(addr => receiver.dial(addr.encapsulate(new Multiaddr(`/p2p/${sender.peerId}`)))), + ...receiver.multiaddrs.map(addr => sender.dial(addr.encapsulate(new Multiaddr(`/p2p/${receiver.peerId}`)))) + ]) + }) + + afterEach(async () => { + receiver.fetchService.unregisterLookupFunction(PREFIX_A) + receiver.fetchService.unregisterLookupFunction(PREFIX_B) + + await sender.stop() + await receiver.stop() + }) + + it('fetch key that exists in receivers datastore', async () => { + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + + const rawData = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const value = (new TextDecoder()).decode(rawData) + expect(value).to.equal('hello world') + }) + + it('Different lookups for different prefixes', async () => { + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + receiver.fetchService.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B)) + + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + // Different lookup functions can be registered on different prefixes, and have different + // values for the same key underneath the different prefix. + const rawDataB = await sender.fetch(receiver.peerId, '/moduleB/foobar') + const valueB = (new TextDecoder()).decode(rawDataB) + expect(valueB).to.equal('goodnight moon') + }) + + it('fetch key that does not exist in receivers datastore', async () => { + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + const result = await sender.fetch(receiver.peerId, '/moduleA/garbage') + + expect(result).to.equal(null) + }) + + it('fetch key with unknown prefix throws error', async () => { + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + + await expect(sender.fetch(receiver.peerId, '/moduleUNKNOWN/foobar')) + .to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) + }) + + it('registering multiple handlers for same prefix errors', async () => { + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + + expect(() => receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B))) + .to.throw().with.property('code', codes.ERR_KEY_ALREADY_EXISTS) + }) + + it('can unregister handler', async () => { + const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) + receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction) + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + receiver.fetchService.unregisterLookupFunction(PREFIX_A, lookupFunction) + + await expect(sender.fetch(receiver.peerId, '/moduleA/foobar')) + .to.eventually.be.rejectedWith(/No lookup function registered for key/) + }) + + it('can unregister all handlers', async () => { + const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) + receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction) + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + receiver.fetchService.unregisterLookupFunction(PREFIX_A) + + await expect(sender.fetch(receiver.peerId, '/moduleA/foobar')) + .to.eventually.be.rejectedWith(/No lookup function registered for key/) + }) + + it('does not unregister wrong handlers', async () => { + const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) + receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction) + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + receiver.fetchService.unregisterLookupFunction(PREFIX_A, () => {}) + + const rawDataB = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueB = (new TextDecoder()).decode(rawDataB) + expect(valueB).to.equal('hello world') + }) +}) diff --git a/tsconfig.json b/tsconfig.json index 0f357f9363..eafbf8ca8a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -8,6 +8,7 @@ ], "exclude": [ "src/circuit/protocol/index.js", // exclude generated file + "src/fetch/proto.js", // exclude generated file "src/identify/message.js", // exclude generated file "src/insecure/proto.js", // exclude generated file "src/peer-store/pb/peer.js", // exclude generated file