diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js index 5617e94eff..bec8c3a8f0 100644 --- a/src/circuit/auto-relay.js +++ b/src/circuit/auto-relay.js @@ -143,8 +143,7 @@ class AutoRelay { try { await this._transportManager.listen([multiaddr(listenAddr)]) - // TODO: push announce multiaddrs update - // await this._libp2p.identifyService.pushToPeerStore() + // Announce multiaddrs will update on listen success by TransportManager event being triggered } catch (err) { log.error(err) this._listenRelays.delete(id) diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 2790918116..b10c2b1327 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -20,8 +20,8 @@ module.exports = (libp2p) => { const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) if (deleted) { - // TODO push announce multiaddrs update - // libp2p.identifyService.pushToPeerStore() + // Announce listen addresses change + listener.emit('close') } }) diff --git a/src/identify/index.js b/src/identify/index.js index 5bd4a51ba0..e8bea2f46f 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -63,12 +63,6 @@ class IdentifyService { */ this.connectionManager = libp2p.connectionManager - this.connectionManager.on('peer:connect', (connection) => { - const peerId = connection.remotePeer - - this.identify(connection, peerId).catch(log.error) - }) - /** * @property {PeerId} */ @@ -82,6 +76,19 @@ class IdentifyService { this._protocols = protocols this.handleMessage = this.handleMessage.bind(this) + + this.connectionManager.on('peer:connect', (connection) => { + const peerId = connection.remotePeer + + this.identify(connection, peerId).catch(log.error) + }) + + // When self multiaddrs change, trigger identify-push + this.peerStore.on('change:multiaddrs', ({ peerId }) => { + if (peerId.toString() === this.peerId.toString()) { + this.pushToPeerStore() + } + }) } /** @@ -90,7 +97,7 @@ class IdentifyService { * @returns {Promise} */ async push (connections) { - const signedPeerRecord = await this._getSelfPeerRecord() + const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes) const protocols = Array.from(this._protocols.keys()) @@ -239,7 +246,7 @@ class IdentifyService { publicKey = this.peerId.pubKey.bytes } - const signedPeerRecord = await this._getSelfPeerRecord() + const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const message = Message.encode({ protocolVersion: PROTOCOL_VERSION, @@ -308,33 +315,6 @@ class IdentifyService { // Update the protocols this.peerStore.protoBook.set(id, message.protocols) } - - /** - * Get self signed peer record raw envelope. - * @return {Uint8Array} - */ - async _getSelfPeerRecord () { - const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId) - - // TODO: support invalidation when dynamic multiaddrs are supported - if (selfSignedPeerRecord) { - return selfSignedPeerRecord - } - - try { - const peerRecord = new PeerRecord({ - peerId: this.peerId, - multiaddrs: this._libp2p.multiaddrs - }) - const envelope = await Envelope.seal(peerRecord, this.peerId) - this.peerStore.addressBook.consumePeerRecord(envelope) - - return this.peerStore.addressBook.getRawEnvelope(this.peerId) - } catch (err) { - log.error('failed to get self peer record') - } - return null - } } module.exports.IdentifyService = IdentifyService diff --git a/src/index.js b/src/index.js index 1b2f65fc1d..806e80d47e 100644 --- a/src/index.js +++ b/src/index.js @@ -247,6 +247,7 @@ class Libp2p extends EventEmitter { log('libp2p is stopping') try { + this._isStarted = false for (const service of this._discovery.values()) { service.removeListener('peer', this._onDiscoveryPeer) } @@ -274,7 +275,6 @@ class Libp2p extends EventEmitter { this.emit('error', err) } } - this._isStarted = false log('libp2p has stopped') } diff --git a/src/record/utils.js b/src/record/utils.js new file mode 100644 index 0000000000..509fea7eec --- /dev/null +++ b/src/record/utils.js @@ -0,0 +1,20 @@ +'use strict' + +const Envelope = require('./envelope') +const PeerRecord = require('./peer-record') + +/** + * Create (or update if existing) self peer record and store it in the AddressBook. + * @param {libp2p} libp2p + * @returns {Promise} + */ +async function updateSelfPeerRecord (libp2p) { + const peerRecord = new PeerRecord({ + peerId: libp2p.peerId, + multiaddrs: libp2p.multiaddrs + }) + const envelope = await Envelope.seal(peerRecord, libp2p.peerId) + libp2p.peerStore.addressBook.consumePeerRecord(envelope) +} + +module.exports.updateSelfPeerRecord = updateSelfPeerRecord diff --git a/src/transport-manager.js b/src/transport-manager.js index 5030c594b5..0467b8ead9 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -7,6 +7,8 @@ const debug = require('debug') const log = debug('libp2p:transports') log.error = debug('libp2p:transports:error') +const { updateSelfPeerRecord } = require('./record/utils') + class TransportManager { /** * @constructor @@ -62,6 +64,8 @@ class TransportManager { log('closing listeners for %s', key) while (listeners.length) { const listener = listeners.pop() + listener.removeAllListeners('listening') + listener.removeAllListeners('close') tasks.push(listener.close()) } } @@ -150,6 +154,10 @@ class TransportManager { const listener = transport.createListener({}, this.onConnection) this._listeners.get(key).push(listener) + // Track listen/close events + listener.on('listening', () => updateSelfPeerRecord(this.libp2p)) + listener.on('close', () => updateSelfPeerRecord(this.libp2p)) + // We need to attempt to listen on everything tasks.push(listener.listen(addr)) } @@ -194,6 +202,8 @@ class TransportManager { if (this._listeners.has(key)) { // Close any running listeners for (const listener of this._listeners.get(key)) { + listener.removeAllListeners('listening') + listener.removeAllListeners('close') await listener.close() } } diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index 401f4eb501..a5407f66e6 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -40,21 +40,28 @@ describe('Dialing (direct, TCP)', () => { let peerStore let remoteAddr - before(async () => { - const [remotePeerId] = await Promise.all([ - PeerId.createFromJSON(Peers[0]) + beforeEach(async () => { + const [localPeerId, remotePeerId] = await Promise.all([ + PeerId.createFromJSON(Peers[0]), + PeerId.createFromJSON(Peers[1]) ]) + + peerStore = new PeerStore({ peerId: remotePeerId }) remoteTM = new TransportManager({ libp2p: { - addressManager: new AddressManager({ listen: [listenAddr] }) + addressManager: new AddressManager({ listen: [listenAddr] }), + peerId: remotePeerId, + peerStore }, upgrader: mockUpgrader }) remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport) - peerStore = new PeerStore({ peerId: remotePeerId }) localTM = new TransportManager({ - libp2p: {}, + libp2p: { + peerId: localPeerId, + peerStore: new PeerStore({ peerId: localPeerId }) + }, upgrader: mockUpgrader }) localTM.add(Transport.prototype[Symbol.toStringTag], Transport) @@ -64,7 +71,7 @@ describe('Dialing (direct, TCP)', () => { remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`) }) - after(() => remoteTM.close()) + afterEach(() => remoteTM.close()) afterEach(() => { sinon.restore() @@ -110,7 +117,7 @@ describe('Dialing (direct, TCP)', () => { peerStore }) - peerStore.addressBook.set(peerId, [remoteAddr]) + peerStore.addressBook.set(peerId, remoteTM.getAddrs()) const connection = await dialer.connectToPeer(peerId) expect(connection).to.exist() diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index e3d60e5960..c0aacaad78 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -354,7 +354,6 @@ describe('Dialing (direct, WebSockets)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') sinon.spy(libp2p.peerStore.protoBook, 'set') // Wait for onConnection to be called @@ -363,8 +362,6 @@ describe('Dialing (direct, WebSockets)', () => { expect(libp2p.identifyService.identify.callCount).to.equal(1) await libp2p.identifyService.identify.firstCall.returnValue - // Self + New peer - expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2) expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1) }) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 799a3ebd49..e1b741a224 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -8,7 +8,6 @@ const { expect } = chai const sinon = require('sinon') const { EventEmitter } = require('events') -const delay = require('delay') const PeerId = require('peer-id') const duplexPair = require('it-pair/duplex') const multiaddr = require('multiaddr') @@ -22,6 +21,7 @@ const Libp2p = require('../../src') const Envelope = require('../../src/record/envelope') const PeerStore = require('../../src/peer-store') const baseOptions = require('../utils/base-options.browser') +const { updateSelfPeerRecord } = require('../../src/record/utils') const pkg = require('../../package.json') const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') @@ -78,6 +78,9 @@ describe('Identify', () => { sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(localIdentify.peerStore.protoBook, 'set') + // Transport Manager creates signed peer record + await updateSelfPeerRecord(remoteIdentify._libp2p) + // Run identify await Promise.all([ localIdentify.identify(localConnectionMock), @@ -239,6 +242,10 @@ describe('Identify', () => { sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(remoteIdentify.peerStore.protoBook, 'set') + // Transport Manager creates signed peer record + await updateSelfPeerRecord(localIdentify._libp2p) + await updateSelfPeerRecord(remoteIdentify._libp2p) + // Run identify await Promise.all([ localIdentify.push([localConnectionMock]), @@ -249,7 +256,7 @@ describe('Identify', () => { }) ]) - expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1) + expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2) expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) const addresses = localIdentify.peerStore.addressBook.get(localPeer) @@ -359,8 +366,8 @@ describe('Identify', () => { expect(connection).to.exist() // Wait for peer store to be updated - // Dialer._createDialTarget (add), Identify (consume), Create self (consume) - await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1) + // Dialer._createDialTarget (add), Identify (consume) + await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) expect(libp2p.identifyService.identify.callCount).to.equal(1) // The connection should have no open streams @@ -381,8 +388,6 @@ describe('Identify', () => { const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() - // Wait for nextTick to trigger the identify call - await delay(1) // Wait for identify to finish await libp2p.identifyService.identify.firstCall.returnValue @@ -404,5 +409,39 @@ describe('Identify', () => { // Verify the streams close await pWaitFor(() => connection.streams.length === 0) }) + + it('should push multiaddr updates to an already connected peer', async () => { + libp2p = new Libp2p({ + ...baseOptions, + peerId + }) + + await libp2p.start() + + sinon.spy(libp2p.identifyService, 'identify') + sinon.spy(libp2p.identifyService, 'push') + + const connection = await libp2p.dialer.connectToPeer(remoteAddr) + expect(connection).to.exist() + + // Wait for identify to finish + await libp2p.identifyService.identify.firstCall.returnValue + sinon.stub(libp2p, 'isStarted').returns(true) + + libp2p.peerStore.addressBook.add(libp2p.peerId, [multiaddr('/ip4/180.0.0.1/tcp/15001/ws')]) + + // Verify the remote peer is notified of change + expect(libp2p.identifyService.push.callCount).to.equal(1) + for (const call of libp2p.identifyService.push.getCalls()) { + const [connections] = call.args + expect(connections.length).to.equal(1) + expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId()) + const results = await call.returnValue + expect(results.length).to.equal(1) + } + + // Verify the streams close + await pWaitFor(() => connection.streams.length === 0) + }) }) }) diff --git a/test/relay/auto-relay.node.js b/test/relay/auto-relay.node.js index 2a4ba20d57..96f94cd7bb 100644 --- a/test/relay/auto-relay.node.js +++ b/test/relay/auto-relay.node.js @@ -259,6 +259,9 @@ describe('auto-relay', () => { it('should not listen on a relayed address if peer disconnects', async () => { const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length + // Spy if identify push is fired on adding/removing listen addr + sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore') + // Discover one relay and connect relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs) await relayLibp2p1.dial(relayLibp2p2.peerId) @@ -268,8 +271,8 @@ describe('auto-relay', () => { expect(autoRelay1._listenRelays.size).to.equal(1) expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String()) - // Spy if identify push is fired - sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore') + // Identify push for adding listen relay multiaddr + expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1) // Disconnect from peer used for relay await relayLibp2p1.hangUp(relayLibp2p2.peerId) @@ -277,7 +280,9 @@ describe('auto-relay', () => { // Wait for removed listening on the relay await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length) expect(autoRelay1._listenRelays.size).to.equal(0) - // TODO: identify-push expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1) + + // Identify push for removing listen relay multiaddr + expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(2) }) it('should try to listen on other connected peers relayed address if one used relay disconnects', async () => { diff --git a/test/transports/transport-manager.node.js b/test/transports/transport-manager.node.js index 6e8cc12aea..e123c5a370 100644 --- a/test/transports/transport-manager.node.js +++ b/test/transports/transport-manager.node.js @@ -7,9 +7,13 @@ const { expect } = chai const AddressManager = require('../../src/address-manager') const TransportManager = require('../../src/transport-manager') +const PeerStore = require('../../src/peer-store') +const PeerRecord = require('../../src/record/peer-record') const Transport = require('libp2p-tcp') +const PeerId = require('peer-id') const multiaddr = require('multiaddr') const mockUpgrader = require('../utils/mockUpgrader') +const Peers = require('../fixtures/peers') const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/0'), multiaddr('/ip4/127.0.0.1/tcp/0') @@ -17,11 +21,19 @@ const addrs = [ describe('Transport Manager (TCP)', () => { let tm + let localPeer - before(() => { + before(async () => { + localPeer = await PeerId.createFromJSON(Peers[0]) + }) + + beforeEach(() => { tm = new TransportManager({ libp2p: { - addressManager: new AddressManager({ listen: addrs }) + peerId: localPeer, + multiaddrs: addrs, + addressManager: new AddressManager({ listen: addrs }), + peerStore: new PeerStore({ peerId: localPeer }) }, upgrader: mockUpgrader, onConnection: () => {} @@ -44,12 +56,32 @@ describe('Transport Manager (TCP)', () => { await tm.listen(addrs) expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag]) expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length) + // Ephemeral ip addresses may result in multiple listeners expect(tm.getAddrs().length).to.equal(addrs.length) await tm.close() expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0) }) + it('should create self signed peer record on listen', async () => { + let signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer) + expect(signedPeerRecord).to.not.exist() + + tm.add(Transport.prototype[Symbol.toStringTag], Transport) + await tm.listen(addrs) + + // Should created Self Peer record on new listen address + signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer) + expect(signedPeerRecord).to.exist() + + const record = PeerRecord.createFromProtobuf(signedPeerRecord.payload) + expect(record).to.exist() + expect(record.multiaddrs.length).to.equal(addrs.length) + addrs.forEach((a, i) => { + expect(record.multiaddrs[i].equals(a)).to.be.true() + }) + }) + it('should be able to dial', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) await tm.listen(addrs)