Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ proc init*(
overheadRateLimit = Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit = false,
maxNumElementsInNonPriorityQueue = DefaultMaxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish = false,
): GossipSubParams =
GossipSubParams(
explicit: true,
Expand Down Expand Up @@ -139,6 +140,7 @@ proc init*(
overheadRateLimit: overheadRateLimit,
disconnectPeerAboveRateLimit: disconnectPeerAboveRateLimit,
maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue,
sendIDontWantOnPublish: sendIDontWantOnPublish,
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -381,6 +383,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending iwant reply messages", peer
g.send(peer, RPCMsg(messages: messages), isHighPriority = false)

proc sendIDontWant(
g: GossipSub,
msg: Message,
msgId: MessageId,
peersToSendIDontWant: HashSet[PubSubPeer],
) =
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).

# IDONTWANT is only supported by >= GossipSubCodec_12
let peers = peersToSendIDontWant.filterIt(
it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11
)

g.broadcast(
peers,
RPCMsg(
control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)

const iDontWantMessageSizeThreshold* = 512

proc isLargeMessage(msg: Message, msgId: MessageId): bool =
msg.data.len > max(iDontWantMessageSizeThreshold, msgId.len * 10)

proc validateAndRelay(
g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer
) {.async: (raises: []).} =
Expand All @@ -397,29 +433,10 @@ proc validateAndRelay(
toSendPeers.incl(peers[])
toSendPeers.excl(peer)

if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
if isLargeMessage(msg, msgId):
var peersToSendIDontWant = HashSet[PubSubPeer]()
addToSendPeers(peersToSendIDontWant)
peersToSendIDontWant.exclIfIt(
it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11
)
g.broadcast(
peersToSendIDontWant,
RPCMsg(
control:
some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])]))
),
isHighPriority = true,
)
g.sendIDontWant(msg, msgId, peersToSendIDontWant)

let validation = await g.validate(msg)

Expand Down Expand Up @@ -788,6 +805,9 @@ method publish*(

g.mcache.put(msgId, msg)

if g.parameters.sendIDontWantOnPublish and isLargeMessage(msg, msgId):
g.sendIDontWant(msg, msgId, peers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... why should we send IDONTWANT after broadcasting the message? If the other peers receive the message, they already know that I don't want it.

There exists a case where sending it before broadcasting makes sense, though I'm not sure how well it has been studied, ie it needs research: when streaming a large message to a peer, this takes time - in the meantime, the peer might start streaming the same message to us - an IDONTWANT before sending the message tells the other peer not to do that.


g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

if g.knownTopics.contains(topic):
Expand Down
3 changes: 3 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type
# Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected.
maxNumElementsInNonPriorityQueue*: int

# Broadcast an IDONTWANT message automatically when the message exceeds the IDONTWANT message size threshold
sendIDontWantOnPublish*: bool

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]

Expand Down
8 changes: 8 additions & 0 deletions libp2p/utility.nim
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) =
if condition:
toExcl.incl(it)
set.excl(toExcl)

template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] =
var filtered = HashSet[T]()
if set.len != 0:
for it {.inject.} in set:
if condition:
filtered.incl(it)
filtered
38 changes: 38 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,44 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant is broadcasted on publish":
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(
2,
gossip = true,
msgIdProvider = dumbMsgIdProvider,
sendIDontWantOnPublish = true,
)

nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start())

await nodes[0].switch.connect(
nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs
)

proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} =
discard

proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} =
discard

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip2: GossipSub = GossipSub(nodes[1])

tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1

checkUntilTimeout:
gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1)

await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop())

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant is sent only for 1.2":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
Expand Down
2 changes: 2 additions & 0 deletions tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ proc generateNodes*(
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] =
Opt.none(tuple[bytes: int, interval: Duration]),
gossipSubVersion: string = "",
sendIDontWantOnPublish: bool = false,
): seq[PubSub] =
for i in 0 ..< num:
let switch = newStandardSwitch(
Expand All @@ -97,6 +98,7 @@ proc generateNodes*(
p.unsubscribeBackoff = unsubscribeBackoff
p.enablePX = enablePX
p.overheadRateLimit = overheadRateLimit
p.sendIDontWantOnPublish = sendIDontWantOnPublish
p
),
)
Expand Down