From afb5188a956a6d3cc5a679e49030f140e6cbe17d Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Wed, 24 Nov 2021 13:14:34 -0500 Subject: [PATCH 01/24] rebuild protobufs on clean build --- src/identify/message.d.ts | 25 +++- src/identify/message.js | 113 ++++++++++++------ src/insecure/proto.d.ts | 8 +- src/insecure/proto.js | 43 +++++-- .../persistent/pb/address-book.d.ts | 5 +- src/peer-store/persistent/pb/address-book.js | 21 +++- 6 files changed, 158 insertions(+), 57 deletions(-) diff --git a/src/identify/message.d.ts b/src/identify/message.d.ts index ba49c586aa..561dbc5569 100644 --- a/src/identify/message.d.ts +++ b/src/identify/message.d.ts @@ -34,25 +34,40 @@ export class Identify implements IIdentify { constructor(p?: IIdentify); /** Identify protocolVersion. */ - public protocolVersion: string; + public protocolVersion?: (string|null); /** Identify agentVersion. */ - public agentVersion: string; + public agentVersion?: (string|null); /** Identify publicKey. */ - public publicKey: Uint8Array; + public publicKey?: (Uint8Array|null); /** Identify listenAddrs. */ public listenAddrs: Uint8Array[]; /** Identify observedAddr. */ - public observedAddr: Uint8Array; + public observedAddr?: (Uint8Array|null); /** Identify protocols. */ public protocols: string[]; /** Identify signedPeerRecord. */ - public signedPeerRecord: Uint8Array; + public signedPeerRecord?: (Uint8Array|null); + + /** Identify _protocolVersion. */ + public _protocolVersion?: "protocolVersion"; + + /** Identify _agentVersion. */ + public _agentVersion?: "agentVersion"; + + /** Identify _publicKey. */ + public _publicKey?: "publicKey"; + + /** Identify _observedAddr. */ + public _observedAddr?: "observedAddr"; + + /** Identify _signedPeerRecord. */ + public _signedPeerRecord?: "signedPeerRecord"; /** * Encodes the specified Identify message. Does not implicitly {@link Identify.verify|verify} messages. diff --git a/src/identify/message.js b/src/identify/message.js index f4f04febdb..3c596fd41c 100644 --- a/src/identify/message.js +++ b/src/identify/message.js @@ -43,27 +43,27 @@ $root.Identify = (function() { /** * Identify protocolVersion. - * @member {string} protocolVersion + * @member {string|null|undefined} protocolVersion * @memberof Identify * @instance */ - Identify.prototype.protocolVersion = ""; + Identify.prototype.protocolVersion = null; /** * Identify agentVersion. - * @member {string} agentVersion + * @member {string|null|undefined} agentVersion * @memberof Identify * @instance */ - Identify.prototype.agentVersion = ""; + Identify.prototype.agentVersion = null; /** * Identify publicKey. - * @member {Uint8Array} publicKey + * @member {Uint8Array|null|undefined} publicKey * @memberof Identify * @instance */ - Identify.prototype.publicKey = $util.newBuffer([]); + Identify.prototype.publicKey = null; /** * Identify listenAddrs. @@ -75,11 +75,11 @@ $root.Identify = (function() { /** * Identify observedAddr. - * @member {Uint8Array} observedAddr + * @member {Uint8Array|null|undefined} observedAddr * @memberof Identify * @instance */ - Identify.prototype.observedAddr = $util.newBuffer([]); + Identify.prototype.observedAddr = null; /** * Identify protocols. @@ -91,11 +91,69 @@ $root.Identify = (function() { /** * Identify signedPeerRecord. - * @member {Uint8Array} signedPeerRecord + * @member {Uint8Array|null|undefined} signedPeerRecord * @memberof Identify * @instance */ - Identify.prototype.signedPeerRecord = $util.newBuffer([]); + Identify.prototype.signedPeerRecord = null; + + // OneOf field names bound to virtual getters and setters + var $oneOfFields; + + /** + * Identify _protocolVersion. + * @member {"protocolVersion"|undefined} _protocolVersion + * @memberof Identify + * @instance + */ + Object.defineProperty(Identify.prototype, "_protocolVersion", { + get: $util.oneOfGetter($oneOfFields = ["protocolVersion"]), + set: $util.oneOfSetter($oneOfFields) + }); + + /** + * Identify _agentVersion. + * @member {"agentVersion"|undefined} _agentVersion + * @memberof Identify + * @instance + */ + Object.defineProperty(Identify.prototype, "_agentVersion", { + get: $util.oneOfGetter($oneOfFields = ["agentVersion"]), + set: $util.oneOfSetter($oneOfFields) + }); + + /** + * Identify _publicKey. + * @member {"publicKey"|undefined} _publicKey + * @memberof Identify + * @instance + */ + Object.defineProperty(Identify.prototype, "_publicKey", { + get: $util.oneOfGetter($oneOfFields = ["publicKey"]), + set: $util.oneOfSetter($oneOfFields) + }); + + /** + * Identify _observedAddr. + * @member {"observedAddr"|undefined} _observedAddr + * @memberof Identify + * @instance + */ + Object.defineProperty(Identify.prototype, "_observedAddr", { + get: $util.oneOfGetter($oneOfFields = ["observedAddr"]), + set: $util.oneOfSetter($oneOfFields) + }); + + /** + * Identify _signedPeerRecord. + * @member {"signedPeerRecord"|undefined} _signedPeerRecord + * @memberof Identify + * @instance + */ + Object.defineProperty(Identify.prototype, "_signedPeerRecord", { + get: $util.oneOfGetter($oneOfFields = ["signedPeerRecord"]), + set: $util.oneOfSetter($oneOfFields) + }); /** * Encodes the specified Identify message. Does not implicitly {@link Identify.verify|verify} messages. @@ -256,33 +314,10 @@ $root.Identify = (function() { d.listenAddrs = []; d.protocols = []; } - if (o.defaults) { - if (o.bytes === String) - d.publicKey = ""; - else { - d.publicKey = []; - if (o.bytes !== Array) - d.publicKey = $util.newBuffer(d.publicKey); - } - if (o.bytes === String) - d.observedAddr = ""; - else { - d.observedAddr = []; - if (o.bytes !== Array) - d.observedAddr = $util.newBuffer(d.observedAddr); - } - d.protocolVersion = ""; - d.agentVersion = ""; - if (o.bytes === String) - d.signedPeerRecord = ""; - else { - d.signedPeerRecord = []; - if (o.bytes !== Array) - d.signedPeerRecord = $util.newBuffer(d.signedPeerRecord); - } - } if (m.publicKey != null && m.hasOwnProperty("publicKey")) { d.publicKey = o.bytes === String ? $util.base64.encode(m.publicKey, 0, m.publicKey.length) : o.bytes === Array ? Array.prototype.slice.call(m.publicKey) : m.publicKey; + if (o.oneofs) + d._publicKey = "publicKey"; } if (m.listenAddrs && m.listenAddrs.length) { d.listenAddrs = []; @@ -298,15 +333,23 @@ $root.Identify = (function() { } if (m.observedAddr != null && m.hasOwnProperty("observedAddr")) { d.observedAddr = o.bytes === String ? $util.base64.encode(m.observedAddr, 0, m.observedAddr.length) : o.bytes === Array ? Array.prototype.slice.call(m.observedAddr) : m.observedAddr; + if (o.oneofs) + d._observedAddr = "observedAddr"; } if (m.protocolVersion != null && m.hasOwnProperty("protocolVersion")) { d.protocolVersion = m.protocolVersion; + if (o.oneofs) + d._protocolVersion = "protocolVersion"; } if (m.agentVersion != null && m.hasOwnProperty("agentVersion")) { d.agentVersion = m.agentVersion; + if (o.oneofs) + d._agentVersion = "agentVersion"; } if (m.signedPeerRecord != null && m.hasOwnProperty("signedPeerRecord")) { d.signedPeerRecord = o.bytes === String ? $util.base64.encode(m.signedPeerRecord, 0, m.signedPeerRecord.length) : o.bytes === Array ? Array.prototype.slice.call(m.signedPeerRecord) : m.signedPeerRecord; + if (o.oneofs) + d._signedPeerRecord = "signedPeerRecord"; } return d; }; diff --git a/src/insecure/proto.d.ts b/src/insecure/proto.d.ts index a4fbac0610..191c866968 100644 --- a/src/insecure/proto.d.ts +++ b/src/insecure/proto.d.ts @@ -19,11 +19,17 @@ export class Exchange implements IExchange { constructor(p?: IExchange); /** Exchange id. */ - public id: Uint8Array; + public id?: (Uint8Array|null); /** Exchange pubkey. */ public pubkey?: (IPublicKey|null); + /** Exchange _id. */ + public _id?: "id"; + + /** Exchange _pubkey. */ + public _pubkey?: "pubkey"; + /** * Encodes the specified Exchange message. Does not implicitly {@link Exchange.verify|verify} messages. * @param m Exchange message or plain object to encode diff --git a/src/insecure/proto.js b/src/insecure/proto.js index ab43d4a9ee..1a020b305b 100644 --- a/src/insecure/proto.js +++ b/src/insecure/proto.js @@ -36,11 +36,11 @@ $root.Exchange = (function() { /** * Exchange id. - * @member {Uint8Array} id + * @member {Uint8Array|null|undefined} id * @memberof Exchange * @instance */ - Exchange.prototype.id = $util.newBuffer([]); + Exchange.prototype.id = null; /** * Exchange pubkey. @@ -50,6 +50,31 @@ $root.Exchange = (function() { */ Exchange.prototype.pubkey = null; + // OneOf field names bound to virtual getters and setters + var $oneOfFields; + + /** + * Exchange _id. + * @member {"id"|undefined} _id + * @memberof Exchange + * @instance + */ + Object.defineProperty(Exchange.prototype, "_id", { + get: $util.oneOfGetter($oneOfFields = ["id"]), + set: $util.oneOfSetter($oneOfFields) + }); + + /** + * Exchange _pubkey. + * @member {"pubkey"|undefined} _pubkey + * @memberof Exchange + * @instance + */ + Object.defineProperty(Exchange.prototype, "_pubkey", { + get: $util.oneOfGetter($oneOfFields = ["pubkey"]), + set: $util.oneOfSetter($oneOfFields) + }); + /** * Encodes the specified Exchange message. Does not implicitly {@link Exchange.verify|verify} messages. * @function encode @@ -140,21 +165,15 @@ $root.Exchange = (function() { if (!o) o = {}; var d = {}; - if (o.defaults) { - if (o.bytes === String) - d.id = ""; - else { - d.id = []; - if (o.bytes !== Array) - d.id = $util.newBuffer(d.id); - } - d.pubkey = null; - } if (m.id != null && m.hasOwnProperty("id")) { d.id = o.bytes === String ? $util.base64.encode(m.id, 0, m.id.length) : o.bytes === Array ? Array.prototype.slice.call(m.id) : m.id; + if (o.oneofs) + d._id = "id"; } if (m.pubkey != null && m.hasOwnProperty("pubkey")) { d.pubkey = $root.PublicKey.toObject(m.pubkey, o); + if (o.oneofs) + d._pubkey = "pubkey"; } return d; }; diff --git a/src/peer-store/persistent/pb/address-book.d.ts b/src/peer-store/persistent/pb/address-book.d.ts index 0080a6390c..0fa4f3eb74 100644 --- a/src/peer-store/persistent/pb/address-book.d.ts +++ b/src/peer-store/persistent/pb/address-book.d.ts @@ -89,7 +89,10 @@ export namespace Addresses { public multiaddr: Uint8Array; /** Address isCertified. */ - public isCertified: boolean; + public isCertified?: (boolean|null); + + /** Address _isCertified. */ + public _isCertified?: "isCertified"; /** * Encodes the specified Address message. Does not implicitly {@link Addresses.Address.verify|verify} messages. diff --git a/src/peer-store/persistent/pb/address-book.js b/src/peer-store/persistent/pb/address-book.js index f45bc94214..548a7ea661 100644 --- a/src/peer-store/persistent/pb/address-book.js +++ b/src/peer-store/persistent/pb/address-book.js @@ -213,11 +213,25 @@ $root.Addresses = (function() { /** * Address isCertified. - * @member {boolean} isCertified + * @member {boolean|null|undefined} isCertified * @memberof Addresses.Address * @instance */ - Address.prototype.isCertified = false; + Address.prototype.isCertified = null; + + // OneOf field names bound to virtual getters and setters + var $oneOfFields; + + /** + * Address _isCertified. + * @member {"isCertified"|undefined} _isCertified + * @memberof Addresses.Address + * @instance + */ + Object.defineProperty(Address.prototype, "_isCertified", { + get: $util.oneOfGetter($oneOfFields = ["isCertified"]), + set: $util.oneOfSetter($oneOfFields) + }); /** * Encodes the specified Address message. Does not implicitly {@link Addresses.Address.verify|verify} messages. @@ -315,13 +329,14 @@ $root.Addresses = (function() { if (o.bytes !== Array) d.multiaddr = $util.newBuffer(d.multiaddr); } - d.isCertified = false; } if (m.multiaddr != null && m.hasOwnProperty("multiaddr")) { d.multiaddr = o.bytes === String ? $util.base64.encode(m.multiaddr, 0, m.multiaddr.length) : o.bytes === Array ? Array.prototype.slice.call(m.multiaddr) : m.multiaddr; } if (m.isCertified != null && m.hasOwnProperty("isCertified")) { d.isCertified = m.isCertified; + if (o.oneofs) + d._isCertified = "isCertified"; } return d; }; From 3ab5c0ed26f6c2f43499be6088b45183dc07b1ff Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Wed, 24 Nov 2021 13:48:17 -0500 Subject: [PATCH 02/24] feat: Add fetch protocol --- package.json | 6 +- src/fetch/README.md | 32 ++++ src/fetch/constants.js | 7 + src/fetch/index.js | 106 +++++++++++++ src/fetch/proto.d.ts | 134 ++++++++++++++++ src/fetch/proto.js | 333 +++++++++++++++++++++++++++++++++++++++ src/fetch/proto.proto | 15 ++ test/fetch/fetch.spec.js | 66 ++++++++ 8 files changed, 697 insertions(+), 2 deletions(-) create mode 100644 src/fetch/README.md create mode 100644 src/fetch/constants.js create mode 100644 src/fetch/index.js create mode 100644 src/fetch/proto.d.ts create mode 100644 src/fetch/proto.js create mode 100644 src/fetch/proto.proto create mode 100644 test/fetch/fetch.spec.js diff --git a/package.json b/package.json index 919d16a8b1..90fd102943 100644 --- a/package.json +++ b/package.json @@ -20,16 +20,18 @@ "scripts": { "lint": "aegir lint", "build": "aegir build", - "build:proto": "npm run build:proto:circuit && npm run build:proto:identify && npm run build:proto:plaintext && npm run build:proto:address-book && npm run build:proto:proto-book && npm run build:proto:peer-record && npm run build:proto:envelope", + "build:proto": "npm run build:proto:circuit && npm run build:proto:fetch && npm run build:proto:identify && npm run build:proto:plaintext && npm run build:proto:address-book && npm run build:proto:proto-book && npm run build:proto:peer-record && npm run build:proto:envelope", "build:proto:circuit": "pbjs -t static-module -w commonjs -r libp2p-circuit --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/protocol/index.js ./src/circuit/protocol/index.proto", + "build:proto:fetch": "pbjs -t static-module -w commonjs -r libp2p-fetch --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/fetch/proto.js ./src/fetch/proto.proto", "build:proto:identify": "pbjs -t static-module -w commonjs -r libp2p-identify --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/identify/message.js ./src/identify/message.proto", "build:proto:plaintext": "pbjs -t static-module -w commonjs -r libp2p-plaintext --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/insecure/proto.js ./src/insecure/proto.proto", "build:proto:address-book": "pbjs -t static-module -w commonjs -r libp2p-address-book --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/peer-store/persistent/pb/address-book.js ./src/peer-store/persistent/pb/address-book.proto", "build:proto:proto-book": "pbjs -t static-module -w commonjs -r libp2p-proto-book --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/peer-store/persistent/pb/proto-book.js ./src/peer-store/persistent/pb/proto-book.proto", "build:proto:peer-record": "pbjs -t static-module -w commonjs -r libp2p-peer-record --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/peer-record/peer-record.js ./src/record/peer-record/peer-record.proto", "build:proto:envelope": "pbjs -t static-module -w commonjs -r libp2p-envelope --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/envelope/envelope.js ./src/record/envelope/envelope.proto", - "build:proto-types": "npm run build:proto-types:circuit && npm run build:proto-types:identify && npm run build:proto-types:plaintext && npm run build:proto-types:address-book && npm run build:proto-types:proto-book && npm run build:proto-types:peer-record && npm run build:proto-types:envelope", + "build:proto-types": "npm run build:proto-types:circuit && npm run build:proto-types:fetch && npm run build:proto-types:identify && npm run build:proto-types:plaintext && npm run build:proto-types:address-book && npm run build:proto-types:proto-book && npm run build:proto-types:peer-record && npm run build:proto-types:envelope", "build:proto-types:circuit": "pbts -o src/circuit/protocol/index.d.ts src/circuit/protocol/index.js", + "build:proto-types:fetch": "pbts -o src/fetch/proto.d.ts src/fetch/proto.js", "build:proto-types:identify": "pbts -o src/identify/message.d.ts src/identify/message.js", "build:proto-types:plaintext": "pbts -o src/insecure/proto.d.ts src/insecure/proto.js", "build:proto-types:address-book": "pbts -o src/peer-store/persistent/pb/address-book.d.ts src/peer-store/persistent/pb/address-book.js", diff --git a/src/fetch/README.md b/src/fetch/README.md new file mode 100644 index 0000000000..b5b327ac47 --- /dev/null +++ b/src/fetch/README.md @@ -0,0 +1,32 @@ +libp2p-fetch JavaScript Implementation +===================================== + +> Libp2p fetch protocol JavaScript implementation + +## Overview + +An implementation of the Fetch protocol as described here: https://github.com/libp2p/specs/tree/master/fetch + +The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer. + +## Usage + +```javascript +var Fetch = require('libp2p/src/fetch') + +/** + * Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. + * @param key - a string + * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't + * have a corresponding value. + */ +function lookup(key) { + // app specific callback to lookup key-value pairs. +} + +Fetch.mount(libp2p, lookup) // Enable this peer to respond to fetch requests + +const value = await Fetch(libp2p, peerDst, key) + +Fetch.unmount(libp2p) +``` diff --git a/src/fetch/constants.js b/src/fetch/constants.js new file mode 100644 index 0000000000..f9db7ea480 --- /dev/null +++ b/src/fetch/constants.js @@ -0,0 +1,7 @@ +'use strict' + +module.exports = { + PROTOCOL: '/ipfs/fetch/0.0.1', // deprecated + PROTOCOL_VERSION: '0.0.1', + PROTOCOL_NAME: 'fetch' +} diff --git a/src/fetch/index.js b/src/fetch/index.js new file mode 100644 index 0000000000..73011a80f9 --- /dev/null +++ b/src/fetch/index.js @@ -0,0 +1,106 @@ +'use strict' + +const debug = require('debug') +const log = Object.assign(debug('libp2p:fetch'), { + error: debug('libp2p:fetch:err') +}) +const lp = require('it-length-prefixed') +const { FetchRequest, FetchResponse } = require('./proto') +// @ts-ignore it-handshake does not export types +const handshake = require('it-handshake') + +const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') + + +/** + * @typedef {import('../')} Libp2p + * @typedef {import('multiaddr').Multiaddr} Multiaddr + * @typedef {import('peer-id')} PeerId + * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream + * @typedef {(key: string) => Promise} LookupFunction + */ + +/** + * Ping a given peer and wait for its response, getting the operation latency. + * + * @param {Libp2p} node + * @param {PeerId|Multiaddr} peer + * @param {string} key + * @returns {Promise} + */ +async function fetch (node, peer, key) { + const protocol = `/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + // @ts-ignore multiaddr might not have toB58String + log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer) + + const connection = await node.dial(peer) + const { stream } = await connection.newStream(protocol) + const shake = handshake(stream) + + // send message + const request = new FetchRequest({ identifier: key }) + shake.write(lp.encode.single(FetchRequest.encode(request).finish())) + + // read response + const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + switch (response.status) { + case (FetchResponse.StatusCode.OK): { + return response.data + } + case (FetchResponse.StatusCode.NOT_FOUND): { + return null + } + case (FetchResponse.StatusCode.ERROR): { + throw new Error('Error in fetch protocol response') + } + default: { + throw new Error('Unreachable case') + } + } +} + +/** + * Invoked when a fetch request is received. Reads the request message off the given stream and + * responds based on looking up the key in the request via the lookup callback. + * @param {MuxedStream} stream + * @param {LookupFunction} lookup + */ +async function handleRequest(stream, lookup) { + const shake = handshake(stream) + const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + + let response + const data = await lookup(request.identifier) + if (data) { + response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data }) + } else { + response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) + } + + shake.write(lp.encode.single(FetchResponse.encode(response).finish())) +} + +/** + * Subscribe fetch protocol handler. Must be given a lookup function callback that can be used + * to lookup a value (of type Uint8Array) from a given key (of type string). The lookup function + * should return null if the key isn't found. + * + * @param {Libp2p} node + * @param {LookupFunction} lookup + */ +function mount (node, lookup) { + node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => handleRequest(stream, lookup)) +} + +/** + * Unsubscribe fetch protocol handler. + * + * @param {Libp2p} node + */ +function unmount (node) { + node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`) +} + +exports = module.exports = fetch +exports.mount = mount +exports.unmount = unmount diff --git a/src/fetch/proto.d.ts b/src/fetch/proto.d.ts new file mode 100644 index 0000000000..bf022f516c --- /dev/null +++ b/src/fetch/proto.d.ts @@ -0,0 +1,134 @@ +import * as $protobuf from "protobufjs"; +/** Properties of a FetchRequest. */ +export interface IFetchRequest { + + /** FetchRequest identifier */ + identifier?: (string|null); +} + +/** Represents a FetchRequest. */ +export class FetchRequest implements IFetchRequest { + + /** + * Constructs a new FetchRequest. + * @param [p] Properties to set + */ + constructor(p?: IFetchRequest); + + /** FetchRequest identifier. */ + public identifier: string; + + /** + * Encodes the specified FetchRequest message. Does not implicitly {@link FetchRequest.verify|verify} messages. + * @param m FetchRequest message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: IFetchRequest, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a FetchRequest message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns FetchRequest + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): FetchRequest; + + /** + * Creates a FetchRequest message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns FetchRequest + */ + public static fromObject(d: { [k: string]: any }): FetchRequest; + + /** + * Creates a plain object from a FetchRequest message. Also converts values to other types if specified. + * @param m FetchRequest + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: FetchRequest, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this FetchRequest to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; +} + +/** Properties of a FetchResponse. */ +export interface IFetchResponse { + + /** FetchResponse status */ + status?: (FetchResponse.StatusCode|null); + + /** FetchResponse data */ + data?: (Uint8Array|null); +} + +/** Represents a FetchResponse. */ +export class FetchResponse implements IFetchResponse { + + /** + * Constructs a new FetchResponse. + * @param [p] Properties to set + */ + constructor(p?: IFetchResponse); + + /** FetchResponse status. */ + public status: FetchResponse.StatusCode; + + /** FetchResponse data. */ + public data: Uint8Array; + + /** + * Encodes the specified FetchResponse message. Does not implicitly {@link FetchResponse.verify|verify} messages. + * @param m FetchResponse message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: IFetchResponse, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a FetchResponse message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns FetchResponse + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): FetchResponse; + + /** + * Creates a FetchResponse message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns FetchResponse + */ + public static fromObject(d: { [k: string]: any }): FetchResponse; + + /** + * Creates a plain object from a FetchResponse message. Also converts values to other types if specified. + * @param m FetchResponse + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: FetchResponse, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this FetchResponse to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; +} + +export namespace FetchResponse { + + /** StatusCode enum. */ + enum StatusCode { + OK = 0, + NOT_FOUND = 1, + ERROR = 2 + } +} diff --git a/src/fetch/proto.js b/src/fetch/proto.js new file mode 100644 index 0000000000..f7de2b1dd5 --- /dev/null +++ b/src/fetch/proto.js @@ -0,0 +1,333 @@ +/*eslint-disable*/ +"use strict"; + +var $protobuf = require("protobufjs/minimal"); + +// Common aliases +var $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +var $root = $protobuf.roots["libp2p-fetch"] || ($protobuf.roots["libp2p-fetch"] = {}); + +$root.FetchRequest = (function() { + + /** + * Properties of a FetchRequest. + * @exports IFetchRequest + * @interface IFetchRequest + * @property {string|null} [identifier] FetchRequest identifier + */ + + /** + * Constructs a new FetchRequest. + * @exports FetchRequest + * @classdesc Represents a FetchRequest. + * @implements IFetchRequest + * @constructor + * @param {IFetchRequest=} [p] Properties to set + */ + function FetchRequest(p) { + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * FetchRequest identifier. + * @member {string} identifier + * @memberof FetchRequest + * @instance + */ + FetchRequest.prototype.identifier = ""; + + /** + * Encodes the specified FetchRequest message. Does not implicitly {@link FetchRequest.verify|verify} messages. + * @function encode + * @memberof FetchRequest + * @static + * @param {IFetchRequest} m FetchRequest message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + FetchRequest.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.identifier != null && Object.hasOwnProperty.call(m, "identifier")) + w.uint32(10).string(m.identifier); + return w; + }; + + /** + * Decodes a FetchRequest message from the specified reader or buffer. + * @function decode + * @memberof FetchRequest + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {FetchRequest} FetchRequest + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + FetchRequest.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.FetchRequest(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.identifier = r.string(); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a FetchRequest message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof FetchRequest + * @static + * @param {Object.} d Plain object + * @returns {FetchRequest} FetchRequest + */ + FetchRequest.fromObject = function fromObject(d) { + if (d instanceof $root.FetchRequest) + return d; + var m = new $root.FetchRequest(); + if (d.identifier != null) { + m.identifier = String(d.identifier); + } + return m; + }; + + /** + * Creates a plain object from a FetchRequest message. Also converts values to other types if specified. + * @function toObject + * @memberof FetchRequest + * @static + * @param {FetchRequest} m FetchRequest + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + FetchRequest.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.defaults) { + d.identifier = ""; + } + if (m.identifier != null && m.hasOwnProperty("identifier")) { + d.identifier = m.identifier; + } + return d; + }; + + /** + * Converts this FetchRequest to JSON. + * @function toJSON + * @memberof FetchRequest + * @instance + * @returns {Object.} JSON object + */ + FetchRequest.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + return FetchRequest; +})(); + +$root.FetchResponse = (function() { + + /** + * Properties of a FetchResponse. + * @exports IFetchResponse + * @interface IFetchResponse + * @property {FetchResponse.StatusCode|null} [status] FetchResponse status + * @property {Uint8Array|null} [data] FetchResponse data + */ + + /** + * Constructs a new FetchResponse. + * @exports FetchResponse + * @classdesc Represents a FetchResponse. + * @implements IFetchResponse + * @constructor + * @param {IFetchResponse=} [p] Properties to set + */ + function FetchResponse(p) { + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * FetchResponse status. + * @member {FetchResponse.StatusCode} status + * @memberof FetchResponse + * @instance + */ + FetchResponse.prototype.status = 0; + + /** + * FetchResponse data. + * @member {Uint8Array} data + * @memberof FetchResponse + * @instance + */ + FetchResponse.prototype.data = $util.newBuffer([]); + + /** + * Encodes the specified FetchResponse message. Does not implicitly {@link FetchResponse.verify|verify} messages. + * @function encode + * @memberof FetchResponse + * @static + * @param {IFetchResponse} m FetchResponse message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + FetchResponse.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.status != null && Object.hasOwnProperty.call(m, "status")) + w.uint32(8).int32(m.status); + if (m.data != null && Object.hasOwnProperty.call(m, "data")) + w.uint32(18).bytes(m.data); + return w; + }; + + /** + * Decodes a FetchResponse message from the specified reader or buffer. + * @function decode + * @memberof FetchResponse + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {FetchResponse} FetchResponse + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + FetchResponse.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.FetchResponse(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.status = r.int32(); + break; + case 2: + m.data = r.bytes(); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a FetchResponse message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof FetchResponse + * @static + * @param {Object.} d Plain object + * @returns {FetchResponse} FetchResponse + */ + FetchResponse.fromObject = function fromObject(d) { + if (d instanceof $root.FetchResponse) + return d; + var m = new $root.FetchResponse(); + switch (d.status) { + case "OK": + case 0: + m.status = 0; + break; + case "NOT_FOUND": + case 1: + m.status = 1; + break; + case "ERROR": + case 2: + m.status = 2; + break; + } + if (d.data != null) { + if (typeof d.data === "string") + $util.base64.decode(d.data, m.data = $util.newBuffer($util.base64.length(d.data)), 0); + else if (d.data.length) + m.data = d.data; + } + return m; + }; + + /** + * Creates a plain object from a FetchResponse message. Also converts values to other types if specified. + * @function toObject + * @memberof FetchResponse + * @static + * @param {FetchResponse} m FetchResponse + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + FetchResponse.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.defaults) { + d.status = o.enums === String ? "OK" : 0; + if (o.bytes === String) + d.data = ""; + else { + d.data = []; + if (o.bytes !== Array) + d.data = $util.newBuffer(d.data); + } + } + if (m.status != null && m.hasOwnProperty("status")) { + d.status = o.enums === String ? $root.FetchResponse.StatusCode[m.status] : m.status; + } + if (m.data != null && m.hasOwnProperty("data")) { + d.data = o.bytes === String ? $util.base64.encode(m.data, 0, m.data.length) : o.bytes === Array ? Array.prototype.slice.call(m.data) : m.data; + } + return d; + }; + + /** + * Converts this FetchResponse to JSON. + * @function toJSON + * @memberof FetchResponse + * @instance + * @returns {Object.} JSON object + */ + FetchResponse.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * StatusCode enum. + * @name FetchResponse.StatusCode + * @enum {number} + * @property {number} OK=0 OK value + * @property {number} NOT_FOUND=1 NOT_FOUND value + * @property {number} ERROR=2 ERROR value + */ + FetchResponse.StatusCode = (function() { + var valuesById = {}, values = Object.create(valuesById); + values[valuesById[0] = "OK"] = 0; + values[valuesById[1] = "NOT_FOUND"] = 1; + values[valuesById[2] = "ERROR"] = 2; + return values; + })(); + + return FetchResponse; +})(); + +module.exports = $root; diff --git a/src/fetch/proto.proto b/src/fetch/proto.proto new file mode 100644 index 0000000000..95956f1622 --- /dev/null +++ b/src/fetch/proto.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message FetchRequest { + string identifier = 1; +} + +message FetchResponse { + StatusCode status = 1; + enum StatusCode { + OK = 0; + NOT_FOUND = 1; + ERROR = 2; + } + bytes data = 2; +} diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js new file mode 100644 index 0000000000..285067ac28 --- /dev/null +++ b/test/fetch/fetch.spec.js @@ -0,0 +1,66 @@ +'use strict' +/* eslint-env mocha */ + +const { expect } = require('aegir/utils/chai') +const Libp2p = require('../../src') +const TCP = require('libp2p-tcp') +const Mplex = require('libp2p-mplex') +const { NOISE } = require('@chainsafe/libp2p-noise') +const MDNS = require('libp2p-mdns') +const Fetch = require('../../src/fetch') + +async function createLibp2pNode (lookupFunc) { + return await Libp2p.create({ + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + modules: { + transport: [TCP], + streamMuxer: [Mplex], + connEncryption: [NOISE], + peerDiscovery: [MDNS] + } + }) +} + +describe('Fetch Protocol', () => { + let sender + let receiver + const DATA = { foobar: 'hello world' } + + before(async () => { + sender = await createLibp2pNode() + receiver = await createLibp2pNode() + + const lookupFunc = async function (key) { + const val = DATA[key] + if (val) { + return (new TextEncoder()).encode(val) + } + return null + } + + Fetch.mount(receiver, lookupFunc) + + await sender.start() + await receiver.start() + }) + + after(async () => { + await sender.stop() + await receiver.stop() + }) + + it('fetch key that exists in receivers datastore', async () => { + const rawData = await Fetch(sender, receiver.peerId, 'foobar') + const value = (new TextDecoder()).decode(rawData) + + expect(value).to.equal('hello world') + }) + + it('fetch key that does not exist in receivers datastore', async () => { + const result = await Fetch(sender, receiver.peerId, 'garbage') + + expect(result).to.equal(null) + }) +}) From 9559b2b1b35a1b7a961eb1a91551f98edd2ff7ec Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Wed, 24 Nov 2021 15:39:42 -0500 Subject: [PATCH 03/24] update readme --- src/fetch/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fetch/README.md b/src/fetch/README.md index b5b327ac47..2354f6a7e2 100644 --- a/src/fetch/README.md +++ b/src/fetch/README.md @@ -20,7 +20,7 @@ var Fetch = require('libp2p/src/fetch') * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't * have a corresponding value. */ -function lookup(key) { +async function lookup(key) { // app specific callback to lookup key-value pairs. } From 7ceb9c7e6f0648cb5bffb42663664474a08f726e Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 12:57:22 -0500 Subject: [PATCH 04/24] fix comment --- src/fetch/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index 73011a80f9..16440ea20c 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -21,7 +21,7 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') */ /** - * Ping a given peer and wait for its response, getting the operation latency. + * Sends a request to fetch the value associated with the given key from the given peer. * * @param {Libp2p} node * @param {PeerId|Multiaddr} peer From 5a9eea045b8bc49b06afef065286735f3a158f62 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 13:14:39 -0500 Subject: [PATCH 05/24] wip: use a class to have a stateful instance of fetch protocol --- src/fetch/index.js | 159 +++++++++++++++++++++------------------ test/fetch/fetch.spec.js | 8 +- 2 files changed, 92 insertions(+), 75 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index 16440ea20c..4a9bffd20b 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -11,7 +11,6 @@ const handshake = require('it-handshake') const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') - /** * @typedef {import('../')} Libp2p * @typedef {import('multiaddr').Multiaddr} Multiaddr @@ -20,87 +19,103 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') * @typedef {(key: string) => Promise} LookupFunction */ -/** - * Sends a request to fetch the value associated with the given key from the given peer. - * - * @param {Libp2p} node - * @param {PeerId|Multiaddr} peer - * @param {string} key - * @returns {Promise} - */ -async function fetch (node, peer, key) { - const protocol = `/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` - // @ts-ignore multiaddr might not have toB58String - log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer) +class FetchProtocol { + constructor () { + this.lookup = null + } - const connection = await node.dial(peer) - const { stream } = await connection.newStream(protocol) - const shake = handshake(stream) + /** + * Sends a request to fetch the value associated with the given key from the given peer. + * + * @param {Libp2p} node + * @param {PeerId|Multiaddr} peer + * @param {string} key + * @returns {Promise} + */ + static async fetch (node, peer, key) { + const protocol = `/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + // @ts-ignore multiaddr might not have toB58String + log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer) - // send message - const request = new FetchRequest({ identifier: key }) - shake.write(lp.encode.single(FetchRequest.encode(request).finish())) + const connection = await node.dial(peer) + const { stream } = await connection.newStream(protocol) + const shake = handshake(stream) - // read response - const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) - switch (response.status) { - case (FetchResponse.StatusCode.OK): { - return response.data - } - case (FetchResponse.StatusCode.NOT_FOUND): { - return null - } - case (FetchResponse.StatusCode.ERROR): { - throw new Error('Error in fetch protocol response') - } - default: { - throw new Error('Unreachable case') + // send message + const request = new FetchRequest({ identifier: key }) + shake.write(lp.encode.single(FetchRequest.encode(request).finish())) + + // read response + const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + switch (response.status) { + case (FetchResponse.StatusCode.OK): { + return response.data + } + case (FetchResponse.StatusCode.NOT_FOUND): { + return null + } + case (FetchResponse.StatusCode.ERROR): { + throw new Error('Error in fetch protocol response') + } + default: { + throw new Error('Unreachable case') + } } } -} -/** - * Invoked when a fetch request is received. Reads the request message off the given stream and - * responds based on looking up the key in the request via the lookup callback. - * @param {MuxedStream} stream - * @param {LookupFunction} lookup - */ -async function handleRequest(stream, lookup) { - const shake = handshake(stream) - const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + /** + * Invoked when a fetch request is received. Reads the request message off the given stream and + * responds based on looking up the key in the request via the lookup callback. + * + * @param {MuxedStream} stream + */ + async handleRequest (stream) { + const shake = handshake(stream) + const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) - let response - const data = await lookup(request.identifier) - if (data) { - response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data }) - } else { - response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) + let response + if (this.lookup) { + const data = await this.lookup(request.identifier) + if (data) { + response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data }) + } else { + response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) + } + } else { + response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) + } + + shake.write(lp.encode.single(FetchResponse.encode(response).finish())) } - shake.write(lp.encode.single(FetchResponse.encode(response).finish())) -} + /** + * TODO rename and comments + * + * @param {LookupFunction} lookupFunc + */ + registerLookupFunction (lookupFunc) { + this.lookup = lookupFunc + } -/** - * Subscribe fetch protocol handler. Must be given a lookup function callback that can be used - * to lookup a value (of type Uint8Array) from a given key (of type string). The lookup function - * should return null if the key isn't found. - * - * @param {Libp2p} node - * @param {LookupFunction} lookup - */ -function mount (node, lookup) { - node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => handleRequest(stream, lookup)) -} + /** + * Subscribe fetch protocol handler. Must be given a lookup function callback that can be used + * to lookup a value (of type Uint8Array) from a given key (of type string). The lookup function + * should return null if the key isn't found. + * + * @param {Libp2p} node + */ + mount (node) { + node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => this.handleRequest(stream)) + } -/** - * Unsubscribe fetch protocol handler. - * - * @param {Libp2p} node - */ -function unmount (node) { - node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`) + /** + * Unsubscribe fetch protocol handler. + * + * @param {Libp2p} node + */ + unmount (node) { + node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`) + } } -exports = module.exports = fetch -exports.mount = mount -exports.unmount = unmount +exports = module.exports = FetchProtocol diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js index 285067ac28..abfe5c70de 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.spec.js @@ -40,7 +40,9 @@ describe('Fetch Protocol', () => { return null } - Fetch.mount(receiver, lookupFunc) + const fetch = new Fetch() + fetch.registerLookupFunction(lookupFunc) + fetch.mount(receiver) await sender.start() await receiver.start() @@ -52,14 +54,14 @@ describe('Fetch Protocol', () => { }) it('fetch key that exists in receivers datastore', async () => { - const rawData = await Fetch(sender, receiver.peerId, 'foobar') + const rawData = await Fetch.fetch(sender, receiver.peerId, 'foobar') const value = (new TextDecoder()).decode(rawData) expect(value).to.equal('hello world') }) it('fetch key that does not exist in receivers datastore', async () => { - const result = await Fetch(sender, receiver.peerId, 'garbage') + const result = await Fetch.fetch(sender, receiver.peerId, 'garbage') expect(result).to.equal(null) }) From 2686e5ece8c7be7b3bd862a8fa664cf17ae5b133 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 13:22:36 -0500 Subject: [PATCH 06/24] can register multiple lookup functions --- src/fetch/index.js | 32 +++++++++++++++++++++++++++----- test/fetch/fetch.spec.js | 8 ++++---- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index 4a9bffd20b..acb2528e0f 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -19,9 +19,12 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') * @typedef {(key: string) => Promise} LookupFunction */ +/** + * TODO comments + */ class FetchProtocol { constructor () { - this.lookup = null + this.lookupFunctions = new Map() // Maps key prefix to value lookup function } /** @@ -74,8 +77,9 @@ class FetchProtocol { const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) let response - if (this.lookup) { - const data = await this.lookup(request.identifier) + const lookup = this.getLookupFunction(request.identifier) + if (lookup) { + const data = await lookup(request.identifier) if (data) { response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data }) } else { @@ -88,13 +92,31 @@ class FetchProtocol { shake.write(lp.encode.single(FetchResponse.encode(response).finish())) } + /** + * TODO + * + * @param {string} key + */ + getLookupFunction (key) { + for (const prefix of this.lookupFunctions.keys()) { + if (key.startsWith(prefix)) { + return this.lookupFunctions.get(prefix) + } + } + return null + } + /** * TODO rename and comments * + * @param {string} prefix * @param {LookupFunction} lookupFunc */ - registerLookupFunction (lookupFunc) { - this.lookup = lookupFunc + registerLookupFunction (prefix, lookupFunc) { + if (this.lookupFunctions.has(prefix)) { + throw new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered") + } + this.lookupFunctions.set(prefix, lookupFunc) } /** diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js index abfe5c70de..f0a1cb4f16 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.spec.js @@ -26,7 +26,7 @@ async function createLibp2pNode (lookupFunc) { describe('Fetch Protocol', () => { let sender let receiver - const DATA = { foobar: 'hello world' } + const DATA = { '/moduleA/foobar': 'hello world' } before(async () => { sender = await createLibp2pNode() @@ -41,7 +41,7 @@ describe('Fetch Protocol', () => { } const fetch = new Fetch() - fetch.registerLookupFunction(lookupFunc) + fetch.registerLookupFunction('/moduleA/', lookupFunc) fetch.mount(receiver) await sender.start() @@ -54,14 +54,14 @@ describe('Fetch Protocol', () => { }) it('fetch key that exists in receivers datastore', async () => { - const rawData = await Fetch.fetch(sender, receiver.peerId, 'foobar') + const rawData = await Fetch.fetch(sender, receiver.peerId, '/moduleA/foobar') const value = (new TextDecoder()).decode(rawData) expect(value).to.equal('hello world') }) it('fetch key that does not exist in receivers datastore', async () => { - const result = await Fetch.fetch(sender, receiver.peerId, 'garbage') + const result = await Fetch.fetch(sender, receiver.peerId, '/moduleA/garbage') expect(result).to.equal(null) }) From dbb33158ea8b19a2cfbc182071e7c5782ef041a4 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 13:44:45 -0500 Subject: [PATCH 07/24] update tests --- test/fetch/fetch.spec.js | 65 +++++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js index f0a1cb4f16..3464834d05 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.spec.js @@ -26,23 +26,25 @@ async function createLibp2pNode (lookupFunc) { describe('Fetch Protocol', () => { let sender let receiver - const DATA = { '/moduleA/foobar': 'hello world' } + const PREFIX_A = '/moduleA/' + const PREFIX_B = '/moduleB/' + const DATA_A = { 'foobar': 'hello world' } + const DATA_B = { 'foobar': 'goodnight moon' } - before(async () => { - sender = await createLibp2pNode() - receiver = await createLibp2pNode() - - const lookupFunc = async function (key) { - const val = DATA[key] + const generateLookupFunction = function (prefix, data) { + return async function (key) { + key = key.slice(prefix.length) // strip prefix from key + const val = data[key] if (val) { return (new TextEncoder()).encode(val) } return null } + } - const fetch = new Fetch() - fetch.registerLookupFunction('/moduleA/', lookupFunc) - fetch.mount(receiver) + before(async () => { + sender = await createLibp2pNode() + receiver = await createLibp2pNode() await sender.start() await receiver.start() @@ -54,15 +56,56 @@ describe('Fetch Protocol', () => { }) it('fetch key that exists in receivers datastore', async () => { + const fetch = new Fetch() + fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + fetch.mount(receiver) + const rawData = await Fetch.fetch(sender, receiver.peerId, '/moduleA/foobar') const value = (new TextDecoder()).decode(rawData) - expect(value).to.equal('hello world') }) + it('Different lookups for different prefixes', async () => { + const fetch = new Fetch() + fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + fetch.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B)) + fetch.mount(receiver) + + const rawDataA = await Fetch.fetch(sender, receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + // Different lookup functions can be registered on different prefixes, and have different + // values for the same key underneath the different prefix. + const rawDataB = await Fetch.fetch(sender, receiver.peerId, '/moduleB/foobar') + const valueB = (new TextDecoder()).decode(rawDataB) + expect(valueB).to.equal('goodnight moon') + }) + it('fetch key that does not exist in receivers datastore', async () => { + const fetch = new Fetch() + fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + fetch.mount(receiver) const result = await Fetch.fetch(sender, receiver.peerId, '/moduleA/garbage') expect(result).to.equal(null) }) + + it('fetch key with unknown prefix returns null', async () => { + const fetch = new Fetch() + fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + fetch.mount(receiver) + + const result = await Fetch.fetch(sender, receiver.peerId, '/moduleC/foobar') + + expect(result).to.equal(null) + }) + + it('Registering multiple handlers for same prefix errors', async () => { + const fetch = new Fetch() + fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + expect(function () { + fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B)) + }).to.throw('already registered') + }) }) From 3d73cc001a92eb59800301f0b652e90573c7b721 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 13:55:27 -0500 Subject: [PATCH 08/24] update docs/comments --- src/fetch/README.md | 16 +++++++++++----- src/fetch/index.js | 16 +++++++++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/fetch/README.md b/src/fetch/README.md index 2354f6a7e2..338b8aaf32 100644 --- a/src/fetch/README.md +++ b/src/fetch/README.md @@ -12,21 +12,27 @@ The fetch protocol is a simple protocol for requesting a value corresponding to ## Usage ```javascript -var Fetch = require('libp2p/src/fetch') +var FetchProtocol = require('libp2p/src/fetch') /** * Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. + * All keys must be prefixed my the same prefix, which will be used to find the appropriate key + * lookup function. * @param key - a string * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't * have a corresponding value. */ -async function lookup(key) { +async function my_subsystem_key_lookup(key) { // app specific callback to lookup key-value pairs. } -Fetch.mount(libp2p, lookup) // Enable this peer to respond to fetch requests +// Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' +const fetch = new FetchProtocol() +fetch.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) +fetch.mount(libp2p) -const value = await Fetch(libp2p, peerDst, key) +const key = '/my_subsystem_key_prefix/{...}' +const value = await FetchProtocol.fetch(libp2p, peerDst, key) -Fetch.unmount(libp2p) +FetchProtocol.unmount(libp2p) ``` diff --git a/src/fetch/index.js b/src/fetch/index.js index acb2528e0f..13f6d6c56c 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -20,7 +20,10 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') */ /** - * TODO comments + * A simple libp2p protocol for requesting a value corresponding to a key from a peer. + * Developers can register one or more lookup function for retrieving the value corresponding to + * a given key. Each lookup function must act on a distinct part of the overall key space, defined + * by a fixed prefix that all keys that should be routed to that lookup function will start with. */ class FetchProtocol { constructor () { @@ -68,7 +71,8 @@ class FetchProtocol { /** * Invoked when a fetch request is received. Reads the request message off the given stream and - * responds based on looking up the key in the request via the lookup callback. + * responds based on looking up the key in the request via the lookup callback that corresponds + * to the key's prefix. * * @param {MuxedStream} stream */ @@ -93,7 +97,8 @@ class FetchProtocol { } /** - * TODO + * Given a key, finds the appropriate function for looking up its corresponding value, based on + * the key's prefix. * * @param {string} key */ @@ -107,7 +112,8 @@ class FetchProtocol { } /** - * TODO rename and comments + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix. * * @param {string} prefix * @param {LookupFunction} lookupFunc @@ -135,7 +141,7 @@ class FetchProtocol { * * @param {Libp2p} node */ - unmount (node) { + static unmount (node) { node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`) } } From d75ef0881c6511501096c82e73373ad43e3d74e7 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 14:11:35 -0500 Subject: [PATCH 09/24] Use error when missing key handler instead of just returning null --- src/fetch/index.js | 6 ++++-- test/fetch/fetch.spec.js | 12 ++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index 13f6d6c56c..fce317933d 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -61,7 +61,8 @@ class FetchProtocol { return null } case (FetchResponse.StatusCode.ERROR): { - throw new Error('Error in fetch protocol response') + const errmsg = (new TextDecoder()).decode(response.data) + throw new Error('Error in fetch protocol response: ' + errmsg) } default: { throw new Error('Unreachable case') @@ -90,7 +91,8 @@ class FetchProtocol { response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) } } else { - response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND }) + const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) + response = new FetchResponse({ status: FetchResponse.StatusCode.ERROR, data: errmsg }) } shake.write(lp.encode.single(FetchResponse.encode(response).finish())) diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js index 3464834d05..426dbd6927 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.spec.js @@ -91,14 +91,18 @@ describe('Fetch Protocol', () => { expect(result).to.equal(null) }) - it('fetch key with unknown prefix returns null', async () => { + it('fetch key with unknown prefix throws error', async () => { const fetch = new Fetch() fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) fetch.mount(receiver) - const result = await Fetch.fetch(sender, receiver.peerId, '/moduleC/foobar') - - expect(result).to.equal(null) + try { + await Fetch.fetch(sender, receiver.peerId, '/moduleUNKNOWN/foobar') + expect.fail("didn't throw") + } catch (err) { + expect(err).to.be.an('Error') + expect(err.message).to.equal('Error in fetch protocol response: No lookup function registered for key: /moduleUNKNOWN/foobar') + } }) it('Registering multiple handlers for same prefix errors', async () => { From 012636696a2e1fca126efc829e420b376f2b49d1 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 14:16:52 -0500 Subject: [PATCH 10/24] underscore prefix private methods/variables --- src/fetch/index.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index fce317933d..db5868a8a8 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -27,7 +27,7 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') */ class FetchProtocol { constructor () { - this.lookupFunctions = new Map() // Maps key prefix to value lookup function + this._lookupFunctions = new Map() // Maps key prefix to value lookup function } /** @@ -77,12 +77,12 @@ class FetchProtocol { * * @param {MuxedStream} stream */ - async handleRequest (stream) { + async _handleRequest (stream) { const shake = handshake(stream) const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) let response - const lookup = this.getLookupFunction(request.identifier) + const lookup = this._getLookupFunction(request.identifier) if (lookup) { const data = await lookup(request.identifier) if (data) { @@ -104,10 +104,10 @@ class FetchProtocol { * * @param {string} key */ - getLookupFunction (key) { - for (const prefix of this.lookupFunctions.keys()) { + _getLookupFunction (key) { + for (const prefix of this._lookupFunctions.keys()) { if (key.startsWith(prefix)) { - return this.lookupFunctions.get(prefix) + return this._lookupFunctions.get(prefix) } } return null @@ -121,10 +121,10 @@ class FetchProtocol { * @param {LookupFunction} lookupFunc */ registerLookupFunction (prefix, lookupFunc) { - if (this.lookupFunctions.has(prefix)) { + if (this._lookupFunctions.has(prefix)) { throw new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered") } - this.lookupFunctions.set(prefix, lookupFunc) + this._lookupFunctions.set(prefix, lookupFunc) } /** @@ -135,7 +135,7 @@ class FetchProtocol { * @param {Libp2p} node */ mount (node) { - node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => this.handleRequest(stream)) + node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => this._handleRequest(stream)) } /** From 839d349f8e0c60b295fc84429cdf65f14cdfd30d Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 14:17:59 -0500 Subject: [PATCH 11/24] small comment cleanup --- src/fetch/index.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index db5868a8a8..7d296da8df 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -128,9 +128,7 @@ class FetchProtocol { } /** - * Subscribe fetch protocol handler. Must be given a lookup function callback that can be used - * to lookup a value (of type Uint8Array) from a given key (of type string). The lookup function - * should return null if the key isn't found. + * Subscribe fetch protocol handler. * * @param {Libp2p} node */ From daf72321e83feb4e386c2dba513b658c04960abe Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Nov 2021 14:19:13 -0500 Subject: [PATCH 12/24] minor --- src/fetch/index.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index 7d296da8df..86e1e7d429 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -118,13 +118,13 @@ class FetchProtocol { * share the same prefix. * * @param {string} prefix - * @param {LookupFunction} lookupFunc + * @param {LookupFunction} lookup */ - registerLookupFunction (prefix, lookupFunc) { + registerLookupFunction (prefix, lookup) { if (this._lookupFunctions.has(prefix)) { throw new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered") } - this._lookupFunctions.set(prefix, lookupFunc) + this._lookupFunctions.set(prefix, lookup) } /** From d1627a99d62d53d84782dc0eb5ceada14c5080d6 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Fri, 7 Jan 2022 14:17:07 -0500 Subject: [PATCH 13/24] revert generated protobuf code from other modules --- src/identify/message.d.ts | 25 +--- src/identify/message.js | 113 ++++++------------ src/insecure/proto.d.ts | 8 +- src/insecure/proto.js | 43 ++----- .../persistent/pb/address-book.d.ts | 5 +- src/peer-store/persistent/pb/address-book.js | 21 +--- 6 files changed, 57 insertions(+), 158 deletions(-) diff --git a/src/identify/message.d.ts b/src/identify/message.d.ts index 561dbc5569..ba49c586aa 100644 --- a/src/identify/message.d.ts +++ b/src/identify/message.d.ts @@ -34,40 +34,25 @@ export class Identify implements IIdentify { constructor(p?: IIdentify); /** Identify protocolVersion. */ - public protocolVersion?: (string|null); + public protocolVersion: string; /** Identify agentVersion. */ - public agentVersion?: (string|null); + public agentVersion: string; /** Identify publicKey. */ - public publicKey?: (Uint8Array|null); + public publicKey: Uint8Array; /** Identify listenAddrs. */ public listenAddrs: Uint8Array[]; /** Identify observedAddr. */ - public observedAddr?: (Uint8Array|null); + public observedAddr: Uint8Array; /** Identify protocols. */ public protocols: string[]; /** Identify signedPeerRecord. */ - public signedPeerRecord?: (Uint8Array|null); - - /** Identify _protocolVersion. */ - public _protocolVersion?: "protocolVersion"; - - /** Identify _agentVersion. */ - public _agentVersion?: "agentVersion"; - - /** Identify _publicKey. */ - public _publicKey?: "publicKey"; - - /** Identify _observedAddr. */ - public _observedAddr?: "observedAddr"; - - /** Identify _signedPeerRecord. */ - public _signedPeerRecord?: "signedPeerRecord"; + public signedPeerRecord: Uint8Array; /** * Encodes the specified Identify message. Does not implicitly {@link Identify.verify|verify} messages. diff --git a/src/identify/message.js b/src/identify/message.js index 3c596fd41c..f4f04febdb 100644 --- a/src/identify/message.js +++ b/src/identify/message.js @@ -43,27 +43,27 @@ $root.Identify = (function() { /** * Identify protocolVersion. - * @member {string|null|undefined} protocolVersion + * @member {string} protocolVersion * @memberof Identify * @instance */ - Identify.prototype.protocolVersion = null; + Identify.prototype.protocolVersion = ""; /** * Identify agentVersion. - * @member {string|null|undefined} agentVersion + * @member {string} agentVersion * @memberof Identify * @instance */ - Identify.prototype.agentVersion = null; + Identify.prototype.agentVersion = ""; /** * Identify publicKey. - * @member {Uint8Array|null|undefined} publicKey + * @member {Uint8Array} publicKey * @memberof Identify * @instance */ - Identify.prototype.publicKey = null; + Identify.prototype.publicKey = $util.newBuffer([]); /** * Identify listenAddrs. @@ -75,11 +75,11 @@ $root.Identify = (function() { /** * Identify observedAddr. - * @member {Uint8Array|null|undefined} observedAddr + * @member {Uint8Array} observedAddr * @memberof Identify * @instance */ - Identify.prototype.observedAddr = null; + Identify.prototype.observedAddr = $util.newBuffer([]); /** * Identify protocols. @@ -91,69 +91,11 @@ $root.Identify = (function() { /** * Identify signedPeerRecord. - * @member {Uint8Array|null|undefined} signedPeerRecord + * @member {Uint8Array} signedPeerRecord * @memberof Identify * @instance */ - Identify.prototype.signedPeerRecord = null; - - // OneOf field names bound to virtual getters and setters - var $oneOfFields; - - /** - * Identify _protocolVersion. - * @member {"protocolVersion"|undefined} _protocolVersion - * @memberof Identify - * @instance - */ - Object.defineProperty(Identify.prototype, "_protocolVersion", { - get: $util.oneOfGetter($oneOfFields = ["protocolVersion"]), - set: $util.oneOfSetter($oneOfFields) - }); - - /** - * Identify _agentVersion. - * @member {"agentVersion"|undefined} _agentVersion - * @memberof Identify - * @instance - */ - Object.defineProperty(Identify.prototype, "_agentVersion", { - get: $util.oneOfGetter($oneOfFields = ["agentVersion"]), - set: $util.oneOfSetter($oneOfFields) - }); - - /** - * Identify _publicKey. - * @member {"publicKey"|undefined} _publicKey - * @memberof Identify - * @instance - */ - Object.defineProperty(Identify.prototype, "_publicKey", { - get: $util.oneOfGetter($oneOfFields = ["publicKey"]), - set: $util.oneOfSetter($oneOfFields) - }); - - /** - * Identify _observedAddr. - * @member {"observedAddr"|undefined} _observedAddr - * @memberof Identify - * @instance - */ - Object.defineProperty(Identify.prototype, "_observedAddr", { - get: $util.oneOfGetter($oneOfFields = ["observedAddr"]), - set: $util.oneOfSetter($oneOfFields) - }); - - /** - * Identify _signedPeerRecord. - * @member {"signedPeerRecord"|undefined} _signedPeerRecord - * @memberof Identify - * @instance - */ - Object.defineProperty(Identify.prototype, "_signedPeerRecord", { - get: $util.oneOfGetter($oneOfFields = ["signedPeerRecord"]), - set: $util.oneOfSetter($oneOfFields) - }); + Identify.prototype.signedPeerRecord = $util.newBuffer([]); /** * Encodes the specified Identify message. Does not implicitly {@link Identify.verify|verify} messages. @@ -314,10 +256,33 @@ $root.Identify = (function() { d.listenAddrs = []; d.protocols = []; } + if (o.defaults) { + if (o.bytes === String) + d.publicKey = ""; + else { + d.publicKey = []; + if (o.bytes !== Array) + d.publicKey = $util.newBuffer(d.publicKey); + } + if (o.bytes === String) + d.observedAddr = ""; + else { + d.observedAddr = []; + if (o.bytes !== Array) + d.observedAddr = $util.newBuffer(d.observedAddr); + } + d.protocolVersion = ""; + d.agentVersion = ""; + if (o.bytes === String) + d.signedPeerRecord = ""; + else { + d.signedPeerRecord = []; + if (o.bytes !== Array) + d.signedPeerRecord = $util.newBuffer(d.signedPeerRecord); + } + } if (m.publicKey != null && m.hasOwnProperty("publicKey")) { d.publicKey = o.bytes === String ? $util.base64.encode(m.publicKey, 0, m.publicKey.length) : o.bytes === Array ? Array.prototype.slice.call(m.publicKey) : m.publicKey; - if (o.oneofs) - d._publicKey = "publicKey"; } if (m.listenAddrs && m.listenAddrs.length) { d.listenAddrs = []; @@ -333,23 +298,15 @@ $root.Identify = (function() { } if (m.observedAddr != null && m.hasOwnProperty("observedAddr")) { d.observedAddr = o.bytes === String ? $util.base64.encode(m.observedAddr, 0, m.observedAddr.length) : o.bytes === Array ? Array.prototype.slice.call(m.observedAddr) : m.observedAddr; - if (o.oneofs) - d._observedAddr = "observedAddr"; } if (m.protocolVersion != null && m.hasOwnProperty("protocolVersion")) { d.protocolVersion = m.protocolVersion; - if (o.oneofs) - d._protocolVersion = "protocolVersion"; } if (m.agentVersion != null && m.hasOwnProperty("agentVersion")) { d.agentVersion = m.agentVersion; - if (o.oneofs) - d._agentVersion = "agentVersion"; } if (m.signedPeerRecord != null && m.hasOwnProperty("signedPeerRecord")) { d.signedPeerRecord = o.bytes === String ? $util.base64.encode(m.signedPeerRecord, 0, m.signedPeerRecord.length) : o.bytes === Array ? Array.prototype.slice.call(m.signedPeerRecord) : m.signedPeerRecord; - if (o.oneofs) - d._signedPeerRecord = "signedPeerRecord"; } return d; }; diff --git a/src/insecure/proto.d.ts b/src/insecure/proto.d.ts index 191c866968..a4fbac0610 100644 --- a/src/insecure/proto.d.ts +++ b/src/insecure/proto.d.ts @@ -19,17 +19,11 @@ export class Exchange implements IExchange { constructor(p?: IExchange); /** Exchange id. */ - public id?: (Uint8Array|null); + public id: Uint8Array; /** Exchange pubkey. */ public pubkey?: (IPublicKey|null); - /** Exchange _id. */ - public _id?: "id"; - - /** Exchange _pubkey. */ - public _pubkey?: "pubkey"; - /** * Encodes the specified Exchange message. Does not implicitly {@link Exchange.verify|verify} messages. * @param m Exchange message or plain object to encode diff --git a/src/insecure/proto.js b/src/insecure/proto.js index 1a020b305b..ab43d4a9ee 100644 --- a/src/insecure/proto.js +++ b/src/insecure/proto.js @@ -36,11 +36,11 @@ $root.Exchange = (function() { /** * Exchange id. - * @member {Uint8Array|null|undefined} id + * @member {Uint8Array} id * @memberof Exchange * @instance */ - Exchange.prototype.id = null; + Exchange.prototype.id = $util.newBuffer([]); /** * Exchange pubkey. @@ -50,31 +50,6 @@ $root.Exchange = (function() { */ Exchange.prototype.pubkey = null; - // OneOf field names bound to virtual getters and setters - var $oneOfFields; - - /** - * Exchange _id. - * @member {"id"|undefined} _id - * @memberof Exchange - * @instance - */ - Object.defineProperty(Exchange.prototype, "_id", { - get: $util.oneOfGetter($oneOfFields = ["id"]), - set: $util.oneOfSetter($oneOfFields) - }); - - /** - * Exchange _pubkey. - * @member {"pubkey"|undefined} _pubkey - * @memberof Exchange - * @instance - */ - Object.defineProperty(Exchange.prototype, "_pubkey", { - get: $util.oneOfGetter($oneOfFields = ["pubkey"]), - set: $util.oneOfSetter($oneOfFields) - }); - /** * Encodes the specified Exchange message. Does not implicitly {@link Exchange.verify|verify} messages. * @function encode @@ -165,15 +140,21 @@ $root.Exchange = (function() { if (!o) o = {}; var d = {}; + if (o.defaults) { + if (o.bytes === String) + d.id = ""; + else { + d.id = []; + if (o.bytes !== Array) + d.id = $util.newBuffer(d.id); + } + d.pubkey = null; + } if (m.id != null && m.hasOwnProperty("id")) { d.id = o.bytes === String ? $util.base64.encode(m.id, 0, m.id.length) : o.bytes === Array ? Array.prototype.slice.call(m.id) : m.id; - if (o.oneofs) - d._id = "id"; } if (m.pubkey != null && m.hasOwnProperty("pubkey")) { d.pubkey = $root.PublicKey.toObject(m.pubkey, o); - if (o.oneofs) - d._pubkey = "pubkey"; } return d; }; diff --git a/src/peer-store/persistent/pb/address-book.d.ts b/src/peer-store/persistent/pb/address-book.d.ts index 0fa4f3eb74..0080a6390c 100644 --- a/src/peer-store/persistent/pb/address-book.d.ts +++ b/src/peer-store/persistent/pb/address-book.d.ts @@ -89,10 +89,7 @@ export namespace Addresses { public multiaddr: Uint8Array; /** Address isCertified. */ - public isCertified?: (boolean|null); - - /** Address _isCertified. */ - public _isCertified?: "isCertified"; + public isCertified: boolean; /** * Encodes the specified Address message. Does not implicitly {@link Addresses.Address.verify|verify} messages. diff --git a/src/peer-store/persistent/pb/address-book.js b/src/peer-store/persistent/pb/address-book.js index 548a7ea661..f45bc94214 100644 --- a/src/peer-store/persistent/pb/address-book.js +++ b/src/peer-store/persistent/pb/address-book.js @@ -213,25 +213,11 @@ $root.Addresses = (function() { /** * Address isCertified. - * @member {boolean|null|undefined} isCertified + * @member {boolean} isCertified * @memberof Addresses.Address * @instance */ - Address.prototype.isCertified = null; - - // OneOf field names bound to virtual getters and setters - var $oneOfFields; - - /** - * Address _isCertified. - * @member {"isCertified"|undefined} _isCertified - * @memberof Addresses.Address - * @instance - */ - Object.defineProperty(Address.prototype, "_isCertified", { - get: $util.oneOfGetter($oneOfFields = ["isCertified"]), - set: $util.oneOfSetter($oneOfFields) - }); + Address.prototype.isCertified = false; /** * Encodes the specified Address message. Does not implicitly {@link Addresses.Address.verify|verify} messages. @@ -329,14 +315,13 @@ $root.Addresses = (function() { if (o.bytes !== Array) d.multiaddr = $util.newBuffer(d.multiaddr); } + d.isCertified = false; } if (m.multiaddr != null && m.hasOwnProperty("multiaddr")) { d.multiaddr = o.bytes === String ? $util.base64.encode(m.multiaddr, 0, m.multiaddr.length) : o.bytes === Array ? Array.prototype.slice.call(m.multiaddr) : m.multiaddr; } if (m.isCertified != null && m.hasOwnProperty("isCertified")) { d.isCertified = m.isCertified; - if (o.oneofs) - d._isCertified = "isCertified"; } return d; }; From 5882a0edf79250dd78270b245df36f4dd2fe1f11 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Fri, 7 Jan 2022 15:13:01 -0500 Subject: [PATCH 14/24] fix test setup --- test/fetch/fetch.spec.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js index 426dbd6927..4a37c091fa 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.spec.js @@ -7,10 +7,12 @@ const TCP = require('libp2p-tcp') const Mplex = require('libp2p-mplex') const { NOISE } = require('@chainsafe/libp2p-noise') const MDNS = require('libp2p-mdns') +const { createPeerId } = require('../utils/creators/peer') const Fetch = require('../../src/fetch') -async function createLibp2pNode (lookupFunc) { +async function createLibp2pNode (peerId) { return await Libp2p.create({ + peerId, addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] }, @@ -43,8 +45,9 @@ describe('Fetch Protocol', () => { } before(async () => { - sender = await createLibp2pNode() - receiver = await createLibp2pNode() + const [peerIdA, peerIdB] = await createPeerId({ number: 2 }) + sender = await createLibp2pNode(peerIdA) + receiver = await createLibp2pNode(peerIdB) await sender.start() await receiver.start() From d658007d5c7d16a0a1b9d35adaf2e94541988b4e Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Fri, 7 Jan 2022 15:23:53 -0500 Subject: [PATCH 15/24] use error codes --- src/errors.js | 3 ++- src/fetch/index.js | 8 +++++--- test/fetch/fetch.spec.js | 18 +++++++++--------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/errors.js b/src/errors.js index 61f308667e..cedb0cfcd0 100644 --- a/src/errors.js +++ b/src/errors.js @@ -60,5 +60,6 @@ exports.codes = { ERR_INVALID_NEW_PASS_TYPE: 'ERR_INVALID_NEW_PASS_TYPE', ERR_INVALID_PASS_LENGTH: 'ERR_INVALID_PASS_LENGTH', ERR_NOT_IMPLEMENTED: 'ERR_NOT_IMPLEMENTED', - ERR_WRONG_PING_ACK: 'ERR_WRONG_PING_ACK' + ERR_WRONG_PING_ACK: 'ERR_WRONG_PING_ACK', + ERR_UNHANDLED_CASE: 'ERR_UNHANDLED_CASE' } diff --git a/src/fetch/index.js b/src/fetch/index.js index 86e1e7d429..5829afda6c 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -4,6 +4,8 @@ const debug = require('debug') const log = Object.assign(debug('libp2p:fetch'), { error: debug('libp2p:fetch:err') }) +const errCode = require('err-code') +const { codes } = require('../errors') const lp = require('it-length-prefixed') const { FetchRequest, FetchResponse } = require('./proto') // @ts-ignore it-handshake does not export types @@ -62,10 +64,10 @@ class FetchProtocol { } case (FetchResponse.StatusCode.ERROR): { const errmsg = (new TextDecoder()).decode(response.data) - throw new Error('Error in fetch protocol response: ' + errmsg) + throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) } default: { - throw new Error('Unreachable case') + throw errCode(new Error('Unreachable case'), codes.ERR_UNHANDLED_CASE) } } } @@ -122,7 +124,7 @@ class FetchProtocol { */ registerLookupFunction (prefix, lookup) { if (this._lookupFunctions.has(prefix)) { - throw new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered") + throw errCode(new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered"), codes.ERR_KEY_ALREADY_EXISTS) } this._lookupFunctions.set(prefix, lookup) } diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.spec.js index 4a37c091fa..aa7cb00d41 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.spec.js @@ -9,6 +9,7 @@ const { NOISE } = require('@chainsafe/libp2p-noise') const MDNS = require('libp2p-mdns') const { createPeerId } = require('../utils/creators/peer') const Fetch = require('../../src/fetch') +const { codes } = require('../../src/errors') async function createLibp2pNode (peerId) { return await Libp2p.create({ @@ -99,20 +100,19 @@ describe('Fetch Protocol', () => { fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) fetch.mount(receiver) - try { - await Fetch.fetch(sender, receiver.peerId, '/moduleUNKNOWN/foobar') - expect.fail("didn't throw") - } catch (err) { - expect(err).to.be.an('Error') - expect(err.message).to.equal('Error in fetch protocol response: No lookup function registered for key: /moduleUNKNOWN/foobar') - } + await expect(Fetch.fetch(sender, receiver.peerId, '/moduleUNKNOWN/foobar')).to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) }) it('Registering multiple handlers for same prefix errors', async () => { const fetch = new Fetch() fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - expect(function () { + + try { fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B)) - }).to.throw('already registered') + expect.fail("didn't throw") + } catch (err) { + expect(err).to.be.an('Error') + expect(err.code).to.equal(codes.ERR_KEY_ALREADY_EXISTS) + } }) }) From ef2d860ca46f8ffb66698cb12dfa6690dbd3b969 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 14:53:03 +0000 Subject: [PATCH 16/24] chore: fix linting and tests --- package.json | 1 - test/fetch/{fetch.spec.js => fetch.node.js} | 17 ++++++----------- tsconfig.json | 1 + 3 files changed, 7 insertions(+), 12 deletions(-) rename test/fetch/{fetch.spec.js => fetch.node.js} (88%) diff --git a/package.json b/package.json index 357ddd9213..a0c2b6fba5 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,6 @@ "test:browser": "aegir test -t browser", "test:examples": "cd examples && npm run test:all", "test:interop": "LIBP2P_JS=$PWD npx aegir test -t node -f ./node_modules/libp2p-interop/test/*", - "prepare": "npm run build", "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "repository": { diff --git a/test/fetch/fetch.spec.js b/test/fetch/fetch.node.js similarity index 88% rename from test/fetch/fetch.spec.js rename to test/fetch/fetch.node.js index aa7cb00d41..48dc4a7b5e 100644 --- a/test/fetch/fetch.spec.js +++ b/test/fetch/fetch.node.js @@ -26,13 +26,13 @@ async function createLibp2pNode (peerId) { }) } -describe('Fetch Protocol', () => { +describe('Fetch', () => { let sender let receiver const PREFIX_A = '/moduleA/' const PREFIX_B = '/moduleB/' - const DATA_A = { 'foobar': 'hello world' } - const DATA_B = { 'foobar': 'goodnight moon' } + const DATA_A = { foobar: 'hello world' } + const DATA_B = { foobar: 'goodnight moon' } const generateLookupFunction = function (prefix, data) { return async function (key) { @@ -103,16 +103,11 @@ describe('Fetch Protocol', () => { await expect(Fetch.fetch(sender, receiver.peerId, '/moduleUNKNOWN/foobar')).to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) }) - it('Registering multiple handlers for same prefix errors', async () => { + it('registering multiple handlers for same prefix errors', async () => { const fetch = new Fetch() fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - try { - fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B)) - expect.fail("didn't throw") - } catch (err) { - expect(err).to.be.an('Error') - expect(err.code).to.equal(codes.ERR_KEY_ALREADY_EXISTS) - } + expect(() => fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B))) + .to.throw().with.property('code', codes.ERR_KEY_ALREADY_EXISTS) }) }) diff --git a/tsconfig.json b/tsconfig.json index 0f357f9363..eafbf8ca8a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -8,6 +8,7 @@ ], "exclude": [ "src/circuit/protocol/index.js", // exclude generated file + "src/fetch/proto.js", // exclude generated file "src/identify/message.js", // exclude generated file "src/insecure/proto.js", // exclude generated file "src/peer-store/pb/peer.js", // exclude generated file From bb61c9e43b9b61537c1b341a05b4c27619df882f Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 15:33:50 +0000 Subject: [PATCH 17/24] chore: refactor for consistency with identify service --- src/fetch/README.md | 14 +++++----- src/fetch/index.js | 57 +++++++++++++++++++++++++--------------- src/index.js | 9 +++++++ test/fetch/fetch.node.js | 50 ++++++++++++++++++----------------- 4 files changed, 77 insertions(+), 53 deletions(-) diff --git a/src/fetch/README.md b/src/fetch/README.md index 338b8aaf32..f71bd54bd8 100644 --- a/src/fetch/README.md +++ b/src/fetch/README.md @@ -12,14 +12,14 @@ The fetch protocol is a simple protocol for requesting a value corresponding to ## Usage ```javascript -var FetchProtocol = require('libp2p/src/fetch') +const Libp2p = require('libp2p') /** * Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. * All keys must be prefixed my the same prefix, which will be used to find the appropriate key * lookup function. * @param key - a string - * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't + * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't * have a corresponding value. */ async function my_subsystem_key_lookup(key) { @@ -27,12 +27,10 @@ async function my_subsystem_key_lookup(key) { } // Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' -const fetch = new FetchProtocol() -fetch.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) -fetch.mount(libp2p) +const libp2p = Libp2p.create(...) +libp2p.fetchService.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) const key = '/my_subsystem_key_prefix/{...}' -const value = await FetchProtocol.fetch(libp2p, peerDst, key) - -FetchProtocol.unmount(libp2p) +const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance +const value = await libp2p.fetchService.fetch(libp2p, peerDst, key) ``` diff --git a/src/fetch/index.js b/src/fetch/index.js index 5829afda6c..dae970ea5a 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -28,25 +28,36 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') * by a fixed prefix that all keys that should be routed to that lookup function will start with. */ class FetchProtocol { - constructor () { + /** + * @param {Libp2p} libp2p + */ + static getProtocolStr (libp2p) { + return `/${libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + } + + /** + * @param {Libp2p} libp2p + */ + constructor (libp2p) { this._lookupFunctions = new Map() // Maps key prefix to value lookup function + this._libp2p = libp2p + this._protocol = `/${this._libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + this.handleMessage = this.handleMessage.bind(this) } /** * Sends a request to fetch the value associated with the given key from the given peer. * - * @param {Libp2p} node * @param {PeerId|Multiaddr} peer * @param {string} key * @returns {Promise} */ - static async fetch (node, peer, key) { - const protocol = `/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + async fetch (peer, key) { // @ts-ignore multiaddr might not have toB58String - log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer) + log('dialing %s to %s', this._protocol, peer.toB58String ? peer.toB58String() : peer) - const connection = await node.dial(peer) - const { stream } = await connection.newStream(protocol) + const connection = await this._libp2p.dial(peer) + const { stream } = await connection.newStream(this._protocol) const shake = handshake(stream) // send message @@ -77,9 +88,12 @@ class FetchProtocol { * responds based on looking up the key in the request via the lookup callback that corresponds * to the key's prefix. * - * @param {MuxedStream} stream + * @param {object} options + * @param {MuxedStream} options.stream + * @param {string} options.protocol */ - async _handleRequest (stream) { + async handleMessage (options) { + const { stream } = options const shake = handshake(stream) const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) @@ -130,21 +144,22 @@ class FetchProtocol { } /** - * Subscribe fetch protocol handler. + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix. * - * @param {Libp2p} node + * @param {string} prefix + * @param {LookupFunction} [lookup] */ - mount (node) { - node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => this._handleRequest(stream)) - } + unregisterLookupFunction (prefix, lookup) { + if (lookup != null) { + const existingLookup = this._lookupFunctions.get(prefix) - /** - * Unsubscribe fetch protocol handler. - * - * @param {Libp2p} node - */ - static unmount (node) { - node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`) + if (existingLookup !== lookup) { + return + } + } + + this._lookupFunctions.delete(prefix) } } diff --git a/src/index.js b/src/index.js index edb715442b..23ca28b960 100644 --- a/src/index.js +++ b/src/index.js @@ -31,6 +31,7 @@ const PubsubAdapter = require('./pubsub-adapter') const Registrar = require('./registrar') const ping = require('./ping') const IdentifyService = require('./identify') +const FetchService = require('./fetch') const NatManager = require('./nat-manager') const { updateSelfPeerRecord } = require('./record/utils') @@ -323,6 +324,8 @@ class Libp2p extends EventEmitter { ping.mount(this) this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) + + this.fetchService = new FetchService(this) } /** @@ -356,6 +359,10 @@ class Libp2p extends EventEmitter { await this.handle(Object.values(IdentifyService.getProtocolStr(this)), this.identifyService.handleMessage) } + if (this.fetchService) { + await this.handle(FetchService.getProtocolStr(this), this.fetchService.handleMessage) + } + try { await this._onStarting() await this._onDidStart() @@ -407,6 +414,8 @@ class Libp2p extends EventEmitter { await this.natManager.stop() await this.transportManager.close() + this.unhandle(FetchService.getProtocolStr(this)) + ping.unmount(this) this.dialer.destroy() } catch (/** @type {any} */ err) { diff --git a/test/fetch/fetch.node.js b/test/fetch/fetch.node.js index 48dc4a7b5e..bbea020aa0 100644 --- a/test/fetch/fetch.node.js +++ b/test/fetch/fetch.node.js @@ -8,8 +8,8 @@ const Mplex = require('libp2p-mplex') const { NOISE } = require('@chainsafe/libp2p-noise') const MDNS = require('libp2p-mdns') const { createPeerId } = require('../utils/creators/peer') -const Fetch = require('../../src/fetch') const { codes } = require('../../src/errors') +const { Multiaddr } = require('multiaddr') async function createLibp2pNode (peerId) { return await Libp2p.create({ @@ -27,7 +27,9 @@ async function createLibp2pNode (peerId) { } describe('Fetch', () => { + /** @type {Libp2p} */ let sender + /** @type {Libp2p} */ let receiver const PREFIX_A = '/moduleA/' const PREFIX_B = '/moduleB/' @@ -45,69 +47,69 @@ describe('Fetch', () => { } } - before(async () => { + beforeEach(async () => { const [peerIdA, peerIdB] = await createPeerId({ number: 2 }) sender = await createLibp2pNode(peerIdA) receiver = await createLibp2pNode(peerIdB) await sender.start() await receiver.start() + + await Promise.all([ + ...sender.multiaddrs.map(addr => receiver.dial(addr.encapsulate(new Multiaddr(`/p2p/${sender.peerId}`)))), + ...receiver.multiaddrs.map(addr => sender.dial(addr.encapsulate(new Multiaddr(`/p2p/${receiver.peerId}`)))) + ]) }) - after(async () => { + afterEach(async () => { + receiver.fetchService.unregisterLookupFunction(PREFIX_A) + receiver.fetchService.unregisterLookupFunction(PREFIX_B) + await sender.stop() await receiver.stop() }) it('fetch key that exists in receivers datastore', async () => { - const fetch = new Fetch() - fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - fetch.mount(receiver) + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - const rawData = await Fetch.fetch(sender, receiver.peerId, '/moduleA/foobar') + const rawData = await sender.fetchService.fetch(receiver.peerId, '/moduleA/foobar') const value = (new TextDecoder()).decode(rawData) expect(value).to.equal('hello world') }) it('Different lookups for different prefixes', async () => { - const fetch = new Fetch() - fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - fetch.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B)) - fetch.mount(receiver) + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + receiver.fetchService.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B)) - const rawDataA = await Fetch.fetch(sender, receiver.peerId, '/moduleA/foobar') + const rawDataA = await sender.fetchService.fetch(receiver.peerId, '/moduleA/foobar') const valueA = (new TextDecoder()).decode(rawDataA) expect(valueA).to.equal('hello world') // Different lookup functions can be registered on different prefixes, and have different // values for the same key underneath the different prefix. - const rawDataB = await Fetch.fetch(sender, receiver.peerId, '/moduleB/foobar') + const rawDataB = await sender.fetchService.fetch(receiver.peerId, '/moduleB/foobar') const valueB = (new TextDecoder()).decode(rawDataB) expect(valueB).to.equal('goodnight moon') }) it('fetch key that does not exist in receivers datastore', async () => { - const fetch = new Fetch() - fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - fetch.mount(receiver) - const result = await Fetch.fetch(sender, receiver.peerId, '/moduleA/garbage') + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + const result = await sender.fetchService.fetch(receiver.peerId, '/moduleA/garbage') expect(result).to.equal(null) }) it('fetch key with unknown prefix throws error', async () => { - const fetch = new Fetch() - fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - fetch.mount(receiver) + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - await expect(Fetch.fetch(sender, receiver.peerId, '/moduleUNKNOWN/foobar')).to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) + await expect(sender.fetchService.fetch(receiver.peerId, '/moduleUNKNOWN/foobar')) + .to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) }) it('registering multiple handlers for same prefix errors', async () => { - const fetch = new Fetch() - fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) + receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - expect(() => fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B))) + expect(() => receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B))) .to.throw().with.property('code', codes.ERR_KEY_ALREADY_EXISTS) }) }) From 30f7e59b2506e005b73961e7677ebc0d3109aa8d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 15:54:07 +0000 Subject: [PATCH 18/24] chore: make fetch command top level --- src/fetch/README.md | 2 +- src/index.js | 11 +++++++++ test/fetch/fetch.node.js | 50 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/fetch/README.md b/src/fetch/README.md index f71bd54bd8..7ea9997a5e 100644 --- a/src/fetch/README.md +++ b/src/fetch/README.md @@ -32,5 +32,5 @@ libp2p.fetchService.registerLookupFunction('/my_subsystem_key_prefix/', my_subsy const key = '/my_subsystem_key_prefix/{...}' const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance -const value = await libp2p.fetchService.fetch(libp2p, peerDst, key) +const value = await libp2p.fetch(peerDst, key) ``` diff --git a/src/index.js b/src/index.js index 23ca28b960..4844c340ef 100644 --- a/src/index.js +++ b/src/index.js @@ -568,6 +568,17 @@ class Libp2p extends EventEmitter { ) } + /** + * Sends a request to fetch the value associated with the given key from the given peer. + * + * @param {PeerId|Multiaddr} peer + * @param {string} key + * @returns {Promise} + */ + fetch (peer, key) { + return this.fetchService.fetch(peer, key) + } + /** * Pings the given peer in order to obtain the operation latency. * diff --git a/test/fetch/fetch.node.js b/test/fetch/fetch.node.js index bbea020aa0..da78495951 100644 --- a/test/fetch/fetch.node.js +++ b/test/fetch/fetch.node.js @@ -72,7 +72,7 @@ describe('Fetch', () => { it('fetch key that exists in receivers datastore', async () => { receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - const rawData = await sender.fetchService.fetch(receiver.peerId, '/moduleA/foobar') + const rawData = await sender.fetch(receiver.peerId, '/moduleA/foobar') const value = (new TextDecoder()).decode(rawData) expect(value).to.equal('hello world') }) @@ -81,20 +81,20 @@ describe('Fetch', () => { receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) receiver.fetchService.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B)) - const rawDataA = await sender.fetchService.fetch(receiver.peerId, '/moduleA/foobar') + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') const valueA = (new TextDecoder()).decode(rawDataA) expect(valueA).to.equal('hello world') // Different lookup functions can be registered on different prefixes, and have different // values for the same key underneath the different prefix. - const rawDataB = await sender.fetchService.fetch(receiver.peerId, '/moduleB/foobar') + const rawDataB = await sender.fetch(receiver.peerId, '/moduleB/foobar') const valueB = (new TextDecoder()).decode(rawDataB) expect(valueB).to.equal('goodnight moon') }) it('fetch key that does not exist in receivers datastore', async () => { receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - const result = await sender.fetchService.fetch(receiver.peerId, '/moduleA/garbage') + const result = await sender.fetch(receiver.peerId, '/moduleA/garbage') expect(result).to.equal(null) }) @@ -102,7 +102,7 @@ describe('Fetch', () => { it('fetch key with unknown prefix throws error', async () => { receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - await expect(sender.fetchService.fetch(receiver.peerId, '/moduleUNKNOWN/foobar')) + await expect(sender.fetch(receiver.peerId, '/moduleUNKNOWN/foobar')) .to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) }) @@ -112,4 +112,44 @@ describe('Fetch', () => { expect(() => receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B))) .to.throw().with.property('code', codes.ERR_KEY_ALREADY_EXISTS) }) + + it('can unregister handler', async () => { + const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) + receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction) + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + receiver.fetchService.unregisterLookupFunction(PREFIX_A, lookupFunction) + + await expect(sender.fetch(receiver.peerId, '/moduleA/foobar')) + .to.eventually.be.rejectedWith(/No lookup function registered for key/) + }) + + it('can unregister all handlers', async () => { + const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) + receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction) + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + receiver.fetchService.unregisterLookupFunction(PREFIX_A) + + await expect(sender.fetch(receiver.peerId, '/moduleA/foobar')) + .to.eventually.be.rejectedWith(/No lookup function registered for key/) + }) + + it('does not unregister wrong handlers', async () => { + const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) + receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction) + const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueA = (new TextDecoder()).decode(rawDataA) + expect(valueA).to.equal('hello world') + + receiver.fetchService.unregisterLookupFunction(PREFIX_A, () => {}) + + const rawDataB = await sender.fetch(receiver.peerId, '/moduleA/foobar') + const valueB = (new TextDecoder()).decode(rawDataB) + expect(valueB).to.equal('hello world') + }) }) From da00c4d8f0282eae7b702e4edef1f07f568f6b21 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 15:57:31 +0000 Subject: [PATCH 19/24] chore: document fetch method --- doc/API.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/doc/API.md b/doc/API.md index 96fc9fc62b..7623f81840 100644 --- a/doc/API.md +++ b/doc/API.md @@ -12,6 +12,7 @@ * [`handle`](#handle) * [`unhandle`](#unhandle) * [`ping`](#ping) + * [`fetch`](#fetch) * [`multiaddrs`](#multiaddrs) * [`addressManager.getListenAddrs`](#addressmanagergetlistenaddrs) * [`addressManager.getAnnounceAddrs`](#addressmanagergetannounceaddrs) @@ -455,6 +456,25 @@ Pings a given peer and get the operation's latency. const latency = await libp2p.ping(otherPeerId) ``` +## fetch + +Fetch a value from a remote node + +`libp2p.fetch(peer, key)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peer | [`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | peer to ping | +| key | `string` | A key that corresponds to a value on the remote node | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | The value for the key or null if it cannot be found | + ## multiaddrs Gets the multiaddrs the libp2p node announces to the network. This computes the advertising multiaddrs From a005361b206d6c602b99603bf7d28e15075f3c80 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 16:08:51 +0000 Subject: [PATCH 20/24] chore: document register/unregister --- doc/API.md | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/doc/API.md b/doc/API.md index 7623f81840..fd2cd82452 100644 --- a/doc/API.md +++ b/doc/API.md @@ -13,6 +13,8 @@ * [`unhandle`](#unhandle) * [`ping`](#ping) * [`fetch`](#fetch) + * [`fetchService.registerLookupFunction`](#fetchserviceregisterlookupfunction) + * [`fetchService.unRegisterLookupFunction`](#fetchserviceunregisterlookupfunction) * [`multiaddrs`](#multiaddrs) * [`addressManager.getListenAddrs`](#addressmanagergetlistenaddrs) * [`addressManager.getAnnounceAddrs`](#addressmanagergetannounceaddrs) @@ -475,6 +477,53 @@ Fetch a value from a remote node |------|-------------| | `Promise` | The value for the key or null if it cannot be found | +#### Example + +```js +// ... +const value = await libp2p.fetch(otherPeerId, '/some/key') +``` + +## fetchService.registerLookupFunction + +Register a function to look up values requested by remote nodes + +`libp2p.fetchService.registerLookupFunction(prefix, lookup)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| prefix | `string` | All queries below this prefix will be passed to the lookup function | +| lookup | `(key: string) => Promise` | A function that takes a key and returns a Uint8Array or null | + +#### Example + +```js +// ... +const value = await libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... }) +``` + +## fetchService.unregisterLookupFunction + +Removes the passed lookup function or any function registered for the passed prefix + +`libp2p.fetchService.unregisterLookupFunction(prefix, lookup)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| prefix | `string` | All queries below this prefix will be passed to the lookup function | +| lookup | `(key: string) => Promise` | Optional: A function that takes a key and returns a Uint8Array or null | + +#### Example + +```js +// ... +libp2p.fetchService.unregisterLookupFunction('/prefix') +``` + ## multiaddrs Gets the multiaddrs the libp2p node announces to the network. This computes the advertising multiaddrs From 839a48758058ac2219cfeb630985eb8ba8e03cb6 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 16:23:01 +0000 Subject: [PATCH 21/24] chore: put prepare back --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index a0c2b6fba5..357ddd9213 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "test:browser": "aegir test -t browser", "test:examples": "cd examples && npm run test:all", "test:interop": "LIBP2P_JS=$PWD npx aegir test -t node -f ./node_modules/libp2p-interop/test/*", + "prepare": "npm run build", "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "repository": { From 24e1ecbf008f1aa3b406c6a478f894c3189d716a Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 16:24:13 +0000 Subject: [PATCH 22/24] chore: reuse existing error code --- src/errors.js | 3 +-- src/fetch/index.js | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/errors.js b/src/errors.js index cedb0cfcd0..61f308667e 100644 --- a/src/errors.js +++ b/src/errors.js @@ -60,6 +60,5 @@ exports.codes = { ERR_INVALID_NEW_PASS_TYPE: 'ERR_INVALID_NEW_PASS_TYPE', ERR_INVALID_PASS_LENGTH: 'ERR_INVALID_PASS_LENGTH', ERR_NOT_IMPLEMENTED: 'ERR_NOT_IMPLEMENTED', - ERR_WRONG_PING_ACK: 'ERR_WRONG_PING_ACK', - ERR_UNHANDLED_CASE: 'ERR_UNHANDLED_CASE' + ERR_WRONG_PING_ACK: 'ERR_WRONG_PING_ACK' } diff --git a/src/fetch/index.js b/src/fetch/index.js index dae970ea5a..0b5f55a6f5 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -78,7 +78,7 @@ class FetchProtocol { throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) } default: { - throw errCode(new Error('Unreachable case'), codes.ERR_UNHANDLED_CASE) + throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) } } } From d03018b362deab19cb66d987bf6d6ce5ec36521f Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 16:42:15 +0000 Subject: [PATCH 23/24] chore: update fetch protocol name in line with spec --- src/fetch/constants.js | 5 ++--- src/fetch/index.js | 13 +++---------- src/index.js | 4 ++-- 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/src/fetch/constants.js b/src/fetch/constants.js index f9db7ea480..2c1044e535 100644 --- a/src/fetch/constants.js +++ b/src/fetch/constants.js @@ -1,7 +1,6 @@ 'use strict' module.exports = { - PROTOCOL: '/ipfs/fetch/0.0.1', // deprecated - PROTOCOL_VERSION: '0.0.1', - PROTOCOL_NAME: 'fetch' + // https://github.com/libp2p/specs/tree/master/fetch#wire-protocol + PROTOCOL: '/libp2p/fetch/0.0.1' } diff --git a/src/fetch/index.js b/src/fetch/index.js index 0b5f55a6f5..0db3abf7cd 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -10,8 +10,7 @@ const lp = require('it-length-prefixed') const { FetchRequest, FetchResponse } = require('./proto') // @ts-ignore it-handshake does not export types const handshake = require('it-handshake') - -const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') +const { PROTOCOL } = require('./constants') /** * @typedef {import('../')} Libp2p @@ -28,12 +27,7 @@ const { PROTOCOL_NAME, PROTOCOL_VERSION } = require('./constants') * by a fixed prefix that all keys that should be routed to that lookup function will start with. */ class FetchProtocol { - /** - * @param {Libp2p} libp2p - */ - static getProtocolStr (libp2p) { - return `/${libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` - } + static PROTOCOL = PROTOCOL /** * @param {Libp2p} libp2p @@ -41,7 +35,6 @@ class FetchProtocol { constructor (libp2p) { this._lookupFunctions = new Map() // Maps key prefix to value lookup function this._libp2p = libp2p - this._protocol = `/${this._libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` this.handleMessage = this.handleMessage.bind(this) } @@ -57,7 +50,7 @@ class FetchProtocol { log('dialing %s to %s', this._protocol, peer.toB58String ? peer.toB58String() : peer) const connection = await this._libp2p.dial(peer) - const { stream } = await connection.newStream(this._protocol) + const { stream } = await connection.newStream(FetchProtocol.PROTOCOL) const shake = handshake(stream) // send message diff --git a/src/index.js b/src/index.js index 4844c340ef..966eb77bbd 100644 --- a/src/index.js +++ b/src/index.js @@ -360,7 +360,7 @@ class Libp2p extends EventEmitter { } if (this.fetchService) { - await this.handle(FetchService.getProtocolStr(this), this.fetchService.handleMessage) + await this.handle(FetchService.PROTOCOL, this.fetchService.handleMessage) } try { @@ -414,7 +414,7 @@ class Libp2p extends EventEmitter { await this.natManager.stop() await this.transportManager.close() - this.unhandle(FetchService.getProtocolStr(this)) + this.unhandle(FetchService.PROTOCOL) ping.unmount(this) this.dialer.destroy() From bb04a1978a22d520741f987799c1af40b05ba7cd Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 24 Jan 2022 16:52:50 +0000 Subject: [PATCH 24/24] chore: linting --- src/fetch/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fetch/index.js b/src/fetch/index.js index 0db3abf7cd..ef8c37f153 100644 --- a/src/fetch/index.js +++ b/src/fetch/index.js @@ -27,8 +27,6 @@ const { PROTOCOL } = require('./constants') * by a fixed prefix that all keys that should be routed to that lookup function will start with. */ class FetchProtocol { - static PROTOCOL = PROTOCOL - /** * @param {Libp2p} libp2p */ @@ -156,4 +154,6 @@ class FetchProtocol { } } +FetchProtocol.PROTOCOL = PROTOCOL + exports = module.exports = FetchProtocol