diff --git a/test/go-gossipsub.ts b/test/go-gossipsub.ts index be0b6f0b..335be350 100644 --- a/test/go-gossipsub.ts +++ b/test/go-gossipsub.ts @@ -25,6 +25,7 @@ import { tearDownGossipsubs, createPeers } from './utils' +import PeerId from 'peer-id' /** * These tests were translated from: @@ -36,6 +37,35 @@ chai.use(require('dirty-chai')) EventEmitter.defaultMaxListeners = 100 +const checkReceivedSubscription = (psub: Gossipsub, peerIdStr: string, topic: string, peerIdx: number, timeout = 1000) => new Promise ((resolve, reject) => { + const event = 'pubsub:subscription-change' + let cb: (peerId: PeerId, subs: RPC.ISubOpts[]) => void + const t = setTimeout(() => reject(`Not received subscriptions of psub ${peerIdx}`), timeout) + cb = (peerId, subs) => { + if (peerId.toB58String() === peerIdStr && subs[0].topicID === topic && subs[0].subscribe === true) { + clearTimeout(t) + psub.off(event, cb) + if (Array.from(psub.topics.get(topic) || []).includes(peerIdStr)) { + resolve() + } else { + reject(Error('topics should include the peerId')) + } + } + } + psub.on(event, cb); +}); + +const checkReceivedSubscriptions = async (psub: Gossipsub, peerIdStrs: string[], topic: string) => { + const recvPeerIdStrs = peerIdStrs.filter((peerIdStr) => peerIdStr !== psub.peerId.toB58String()) + const promises = recvPeerIdStrs.map((peerIdStr, idx) => checkReceivedSubscription(psub, peerIdStr, topic, idx)) + await Promise.all(promises) + expect(Array.from(psub.topics.get(topic) || []).sort()).to.be.deep.equal(recvPeerIdStrs.sort()) + recvPeerIdStrs.forEach((peerIdStr) => { + const peerStream = psub.peers.get(peerIdStr) + expect(peerStream && peerStream.isWritable, "no peerstream or peerstream is not writable").to.be.true + }) +} + /** * Given a topic and data (and debug metadata -- sender index and msg index) * Return a function (takes a gossipsub (and receiver index)) @@ -80,7 +110,7 @@ const awaitEvents = (emitter: EventEmitter, event: string, number: number, timeo }) } -describe.skip('go-libp2p-pubsub gossipsub tests', function () { +describe('go-libp2p-pubsub gossipsub tests', function () { this.timeout(100000) afterEach(() => { sinon.restore() @@ -663,26 +693,40 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { await Promise.all(sendRecv) await tearDownGossipsubs(psubs) }) + it('test gossipsub multihops', async function () { // Create 6 gossipsub nodes // Connect nodes in a line (eg: 0 -> 1 -> 2 -> 3 ...) // Subscribe to the topic, all nodes // Publish a message from node 0 // Assert that the last node receives the message + const numPeers = 6 const psubs = await createGossipsubs({ - number: 6, + number: numPeers, options: { scoreParams: { IPColocationFactorThreshold: 20 } } }) const topic = 'foobar' - for (let i = 0; i < 5; i++) { + for (let i = 0; i < numPeers - 1; i++) { await psubs[i]._libp2p.dialProtocol(psubs[i + 1]._libp2p.peerId, psubs[i].multicodecs) } + const peerIdStrsByIdx: string[][] = [] + for (let i = 0; i < numPeers; i++) { + if (i === 0) { // first + peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String()] + } else if (i > 0 && i < numPeers - 1) { // middle + peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String(), psubs[i - 1].peerId.toB58String()] + } else if (i === numPeers - 1) { // last + peerIdStrsByIdx[i] = [psubs[i - 1].peerId.toB58String()] + } + } - psubs.forEach((ps) => ps.subscribe(topic)) + const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, peerIdStrsByIdx[i], topic)) + psubs.forEach(ps => ps.subscribe(topic)) // wait for heartbeats to build mesh await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2))) + await Promise.all(subscriptionPromises) const msg = uint8ArrayFromString(`${0} its not a flooooood ${0}`) const owner = 0 @@ -691,6 +735,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { await results await tearDownGossipsubs(psubs) }) + it('test gossipsub tree topology', async function () { // Create 10 gossipsub nodes // Connect nodes in a tree, diagram below @@ -714,20 +759,39 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { [8] -> [9] */ const multicodecs = psubs[0].multicodecs - await psubs[0]._libp2p.dialProtocol(psubs[1]._libp2p.peerId, multicodecs) - await psubs[1]._libp2p.dialProtocol(psubs[2]._libp2p.peerId, multicodecs) - await psubs[1]._libp2p.dialProtocol(psubs[4]._libp2p.peerId, multicodecs) - await psubs[2]._libp2p.dialProtocol(psubs[3]._libp2p.peerId, multicodecs) - await psubs[0]._libp2p.dialProtocol(psubs[5]._libp2p.peerId, multicodecs) - await psubs[5]._libp2p.dialProtocol(psubs[6]._libp2p.peerId, multicodecs) - await psubs[5]._libp2p.dialProtocol(psubs[8]._libp2p.peerId, multicodecs) - await psubs[6]._libp2p.dialProtocol(psubs[7]._libp2p.peerId, multicodecs) - await psubs[8]._libp2p.dialProtocol(psubs[9]._libp2p.peerId, multicodecs) + const treeTopology = [ + [1, 5], // 0 + [2, 4], // 1 + [3], // 2 + [], // 3 leaf + [], // 4 leaf + [6, 8], // 5 + [7], // 6 + [], // 7 leaf + [9], // 8 + [], // 9 leaf + ] + for (let from = 0; from < treeTopology.length; from++) { + for (let to of treeTopology[from]) { + await psubs[from]._libp2p.dialProtocol(psubs[to]._libp2p.peerId, multicodecs) + } + } + const getPeerIdStrs = (idx: number): string[] => { + const outbounds = treeTopology[idx] + const inbounds = [] + for (let i = 0; i < treeTopology.length; i++) { + if (treeTopology[i].includes(idx)) inbounds.push(i) + } + return Array.from(new Set([...inbounds, ...outbounds])).map((i) => psubs[i].peerId.toB58String()) + } + + const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, getPeerIdStrs(i), topic)) psubs.forEach((ps) => ps.subscribe(topic)) // wait for heartbeats to build mesh await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2))) + await Promise.all(subscriptionPromises) expectSet(new Set(psubs[0].peers.keys()), [psubs[1].peerId.toB58String(), psubs[5].peerId.toB58String()]) expectSet(new Set(psubs[1].peers.keys()), [ @@ -749,6 +813,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { await Promise.all(sendRecv) await tearDownGossipsubs(psubs) }) + it('test gossipsub star topology with signed peer records', async function () { // Create 20 gossipsub nodes with lower degrees // Connect nodes to a center node, with the center having very low degree @@ -756,6 +821,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { // Assert that all nodes have > 1 connection // Publish one message per node // Assert that the subscribed nodes receive every message + sinon.replace(constants, 'GossipsubPrunePeers', 5 as 16) const psubs = await createGossipsubs({ number: 20, options: { @@ -782,10 +848,13 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { // build the mesh const topic = 'foobar' + const peerIdStrs = psubs.map((psub) => psub.peerId.toB58String()) + const subscriptionPromise = checkReceivedSubscriptions(psubs[0], peerIdStrs, topic) psubs.forEach((ps) => ps.subscribe(topic)) // wait a bit for the mesh to build await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 15, 25000))) + await subscriptionPromise // check that all peers have > 1 connection psubs.forEach((ps) => { @@ -806,6 +875,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { await Promise.all(sendRecv) await tearDownGossipsubs(psubs) }) + it('test gossipsub direct peers', async function () { // Create 3 gossipsub nodes // 2 and 3 with direct peer connections with each other @@ -845,35 +915,43 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { ] await Promise.all(psubs.map((ps) => ps.start())) const multicodecs = psubs[0].multicodecs + // each peer connects to 2 other peers + let connectPromises = libp2ps.map((libp2p) => awaitEvents(libp2p.connectionManager, 'peer:connect', 2)) await libp2ps[0].dialProtocol(libp2ps[1].peerId, multicodecs) await libp2ps[0].dialProtocol(libp2ps[2].peerId, multicodecs) - - // verify that the direct peers connected - await delay(2000) - expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.be.ok + await Promise.all(connectPromises) const topic = 'foobar' - psubs.forEach((ps) => ps.subscribe(topic)) - - await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1))) + const peerIdStrs = libp2ps.map((libp2p) => libp2p.peerId.toB58String()) + let subscriptionPromises = psubs.map((psub) => checkReceivedSubscriptions(psub, peerIdStrs, topic)) + psubs.forEach(ps => ps.subscribe(topic)) + await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 1))) + await Promise.all(subscriptionPromises) let sendRecv = [] for (let i = 0; i < 3; i++) { const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`) const owner = i const results = Promise.all( - psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i)) + psubs.filter((_, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i)) ) sendRecv.push(psubs[owner].publish(topic, msg)) sendRecv.push(results) } await Promise.all(sendRecv) + connectPromises = [1, 2].map((i) => awaitEvents(libp2ps[i].connectionManager, 'peer:connect', 1)) // disconnect the direct peers to test reconnection - libp2ps[1].connectionManager.getAll(libp2ps[2].peerId).forEach((c) => c.close()) + // need more time to disconnect/connect/send subscriptions again + subscriptionPromises = [ + checkReceivedSubscription(psubs[1], peerIdStrs[2], topic, 2, 10000), + checkReceivedSubscription(psubs[2], peerIdStrs[1], topic, 1, 10000), + ] + await libp2ps[1].hangUp(libp2ps[2].peerId); await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 5))) - + await Promise.all(connectPromises) + await Promise.all(subscriptionPromises) expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.be.ok sendRecv = [] @@ -889,14 +967,16 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { await Promise.all(sendRecv) await tearDownGossipsubs(psubs) }) + it('test gossipsub flood publish', async function () { // Create 30 gossipsub nodes // Connect in star topology // Subscribe to the topic, all nodes // Publish 20 messages, each from the center node // Assert that the other nodes receive the message + const numPeers = 30; const psubs = await createGossipsubs({ - number: 30, + number: numPeers, options: { scoreParams: { IPColocationFactorThreshold: 30 } } }) @@ -906,17 +986,21 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { }) ) + const owner = 0 + const psub0 = psubs[owner] + const peerIdStrs = psubs.filter((_, j) => j !== owner).map(psub => psub.peerId.toB58String()) // build the (partial, unstable) mesh const topic = 'foobar' + const subscriptionPromise = checkReceivedSubscriptions(psub0, peerIdStrs, topic) psubs.forEach((ps) => ps.subscribe(topic)) await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1))) + await subscriptionPromise // send messages from the star and assert they were received let sendRecv = [] for (let i = 0; i < 20; i++) { const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`) - const owner = 0 const results = Promise.all( psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i)) ) @@ -926,6 +1010,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () { await Promise.all(sendRecv) await tearDownGossipsubs(psubs) }) + it('test gossipsub negative score', async function () { // Create 20 gossipsub nodes, with scoring params to quickly lower node 0's score // Connect densely