Skip to content
135 changes: 110 additions & 25 deletions test/go-gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
tearDownGossipsubs,
createPeers
} from './utils'
import PeerId from 'peer-id'

/**
* These tests were translated from:
Expand All @@ -36,6 +37,35 @@ chai.use(require('dirty-chai'))

EventEmitter.defaultMaxListeners = 100

const checkReceivedSubscription = (psub: Gossipsub, peerIdStr: string, topic: string, peerIdx: number, timeout = 1000) => new Promise<void> ((resolve, reject) => {
const event = 'pubsub:subscription-change'
let cb: (peerId: PeerId, subs: RPC.ISubOpts[]) => void
const t = setTimeout(() => reject(`Not received subscriptions of psub ${peerIdx}`), timeout)
cb = (peerId, subs) => {
if (peerId.toB58String() === peerIdStr && subs[0].topicID === topic && subs[0].subscribe === true) {
clearTimeout(t)
psub.off(event, cb)
if (Array.from(psub.topics.get(topic) || []).includes(peerIdStr)) {
resolve()
} else {
reject(Error('topics should include the peerId'))
}
}
}
psub.on(event, cb);
});

const checkReceivedSubscriptions = async (psub: Gossipsub, peerIdStrs: string[], topic: string) => {
const recvPeerIdStrs = peerIdStrs.filter((peerIdStr) => peerIdStr !== psub.peerId.toB58String())
const promises = recvPeerIdStrs.map((peerIdStr, idx) => checkReceivedSubscription(psub, peerIdStr, topic, idx))
await Promise.all(promises)
expect(Array.from(psub.topics.get(topic) || []).sort()).to.be.deep.equal(recvPeerIdStrs.sort())
recvPeerIdStrs.forEach((peerIdStr) => {
const peerStream = psub.peers.get(peerIdStr)
expect(peerStream && peerStream.isWritable, "no peerstream or peerstream is not writable").to.be.true
})
}

/**
* Given a topic and data (and debug metadata -- sender index and msg index)
* Return a function (takes a gossipsub (and receiver index))
Expand Down Expand Up @@ -80,7 +110,7 @@ const awaitEvents = (emitter: EventEmitter, event: string, number: number, timeo
})
}

describe.skip('go-libp2p-pubsub gossipsub tests', function () {
describe('go-libp2p-pubsub gossipsub tests', function () {
this.timeout(100000)
afterEach(() => {
sinon.restore()
Expand Down Expand Up @@ -663,26 +693,40 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub multihops', async function () {
// Create 6 gossipsub nodes
// Connect nodes in a line (eg: 0 -> 1 -> 2 -> 3 ...)
// Subscribe to the topic, all nodes
// Publish a message from node 0
// Assert that the last node receives the message
const numPeers = 6
const psubs = await createGossipsubs({
number: 6,
number: numPeers,
options: { scoreParams: { IPColocationFactorThreshold: 20 } }
})
const topic = 'foobar'

for (let i = 0; i < 5; i++) {
for (let i = 0; i < numPeers - 1; i++) {
await psubs[i]._libp2p.dialProtocol(psubs[i + 1]._libp2p.peerId, psubs[i].multicodecs)
}
const peerIdStrsByIdx: string[][] = []
for (let i = 0; i < numPeers; i++) {
if (i === 0) { // first
peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String()]
} else if (i > 0 && i < numPeers - 1) { // middle
peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String(), psubs[i - 1].peerId.toB58String()]
} else if (i === numPeers - 1) { // last
peerIdStrsByIdx[i] = [psubs[i - 1].peerId.toB58String()]
}
}

psubs.forEach((ps) => ps.subscribe(topic))
const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, peerIdStrsByIdx[i], topic))
psubs.forEach(ps => ps.subscribe(topic))

// wait for heartbeats to build mesh
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(subscriptionPromises)

const msg = uint8ArrayFromString(`${0} its not a flooooood ${0}`)
const owner = 0
Expand All @@ -691,6 +735,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await results
await tearDownGossipsubs(psubs)
})

it('test gossipsub tree topology', async function () {
// Create 10 gossipsub nodes
// Connect nodes in a tree, diagram below
Expand All @@ -714,20 +759,39 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
[8] -> [9]
*/
const multicodecs = psubs[0].multicodecs
await psubs[0]._libp2p.dialProtocol(psubs[1]._libp2p.peerId, multicodecs)
await psubs[1]._libp2p.dialProtocol(psubs[2]._libp2p.peerId, multicodecs)
await psubs[1]._libp2p.dialProtocol(psubs[4]._libp2p.peerId, multicodecs)
await psubs[2]._libp2p.dialProtocol(psubs[3]._libp2p.peerId, multicodecs)
await psubs[0]._libp2p.dialProtocol(psubs[5]._libp2p.peerId, multicodecs)
await psubs[5]._libp2p.dialProtocol(psubs[6]._libp2p.peerId, multicodecs)
await psubs[5]._libp2p.dialProtocol(psubs[8]._libp2p.peerId, multicodecs)
await psubs[6]._libp2p.dialProtocol(psubs[7]._libp2p.peerId, multicodecs)
await psubs[8]._libp2p.dialProtocol(psubs[9]._libp2p.peerId, multicodecs)
const treeTopology = [
[1, 5], // 0
[2, 4], // 1
[3], // 2
[], // 3 leaf
[], // 4 leaf
[6, 8], // 5
[7], // 6
[], // 7 leaf
[9], // 8
[], // 9 leaf
]
for (let from = 0; from < treeTopology.length; from++) {
for (let to of treeTopology[from]) {
await psubs[from]._libp2p.dialProtocol(psubs[to]._libp2p.peerId, multicodecs)
}
}

const getPeerIdStrs = (idx: number): string[] => {
const outbounds = treeTopology[idx]
const inbounds = []
for (let i = 0; i < treeTopology.length; i++) {
if (treeTopology[i].includes(idx)) inbounds.push(i)
}
return Array.from(new Set([...inbounds, ...outbounds])).map((i) => psubs[i].peerId.toB58String())
}

const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, getPeerIdStrs(i), topic))
psubs.forEach((ps) => ps.subscribe(topic))

// wait for heartbeats to build mesh
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(subscriptionPromises)

expectSet(new Set(psubs[0].peers.keys()), [psubs[1].peerId.toB58String(), psubs[5].peerId.toB58String()])
expectSet(new Set(psubs[1].peers.keys()), [
Expand All @@ -749,13 +813,15 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub star topology with signed peer records', async function () {
// Create 20 gossipsub nodes with lower degrees
// Connect nodes to a center node, with the center having very low degree
// Subscribe to the topic, all nodes
// Assert that all nodes have > 1 connection
// Publish one message per node
// Assert that the subscribed nodes receive every message
sinon.replace(constants, 'GossipsubPrunePeers', 5 as 16)
const psubs = await createGossipsubs({
number: 20,
options: {
Expand All @@ -782,10 +848,13 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {

// build the mesh
const topic = 'foobar'
const peerIdStrs = psubs.map((psub) => psub.peerId.toB58String())
const subscriptionPromise = checkReceivedSubscriptions(psubs[0], peerIdStrs, topic)
psubs.forEach((ps) => ps.subscribe(topic))

// wait a bit for the mesh to build
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 15, 25000)))
await subscriptionPromise

// check that all peers have > 1 connection
psubs.forEach((ps) => {
Expand All @@ -806,6 +875,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub direct peers', async function () {
// Create 3 gossipsub nodes
// 2 and 3 with direct peer connections with each other
Expand Down Expand Up @@ -845,35 +915,43 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
]
await Promise.all(psubs.map((ps) => ps.start()))
const multicodecs = psubs[0].multicodecs
// each peer connects to 2 other peers
let connectPromises = libp2ps.map((libp2p) => awaitEvents(libp2p.connectionManager, 'peer:connect', 2))
await libp2ps[0].dialProtocol(libp2ps[1].peerId, multicodecs)
await libp2ps[0].dialProtocol(libp2ps[2].peerId, multicodecs)

// verify that the direct peers connected
await delay(2000)
expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.be.ok
await Promise.all(connectPromises)

const topic = 'foobar'
psubs.forEach((ps) => ps.subscribe(topic))

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
const peerIdStrs = libp2ps.map((libp2p) => libp2p.peerId.toB58String())
let subscriptionPromises = psubs.map((psub) => checkReceivedSubscriptions(psub, peerIdStrs, topic))
psubs.forEach(ps => ps.subscribe(topic))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
await Promise.all(subscriptionPromises)

let sendRecv = []
for (let i = 0; i < 3; i++) {
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
const owner = i
const results = Promise.all(
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
psubs.filter((_, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
)
sendRecv.push(psubs[owner].publish(topic, msg))
sendRecv.push(results)
}
await Promise.all(sendRecv)

connectPromises = [1, 2].map((i) => awaitEvents(libp2ps[i].connectionManager, 'peer:connect', 1))
// disconnect the direct peers to test reconnection
libp2ps[1].connectionManager.getAll(libp2ps[2].peerId).forEach((c) => c.close())
// need more time to disconnect/connect/send subscriptions again
subscriptionPromises = [
checkReceivedSubscription(psubs[1], peerIdStrs[2], topic, 2, 10000),
checkReceivedSubscription(psubs[2], peerIdStrs[1], topic, 1, 10000),
]
await libp2ps[1].hangUp(libp2ps[2].peerId);

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 5)))

await Promise.all(connectPromises)
await Promise.all(subscriptionPromises)
expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.be.ok

sendRecv = []
Expand All @@ -889,14 +967,16 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub flood publish', async function () {
// Create 30 gossipsub nodes
// Connect in star topology
// Subscribe to the topic, all nodes
// Publish 20 messages, each from the center node
// Assert that the other nodes receive the message
const numPeers = 30;
const psubs = await createGossipsubs({
number: 30,
number: numPeers,
options: { scoreParams: { IPColocationFactorThreshold: 30 } }
})

Expand All @@ -906,17 +986,21 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
})
)

const owner = 0
const psub0 = psubs[owner]
const peerIdStrs = psubs.filter((_, j) => j !== owner).map(psub => psub.peerId.toB58String())
// build the (partial, unstable) mesh
const topic = 'foobar'
const subscriptionPromise = checkReceivedSubscriptions(psub0, peerIdStrs, topic)
psubs.forEach((ps) => ps.subscribe(topic))

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
await subscriptionPromise

// send messages from the star and assert they were received
let sendRecv = []
for (let i = 0; i < 20; i++) {
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
const owner = 0
const results = Promise.all(
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
)
Expand All @@ -926,6 +1010,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub negative score', async function () {
// Create 20 gossipsub nodes, with scoring params to quickly lower node 0's score
// Connect densely
Expand Down