From 96b95494a618448bf30a43c05aa222264e9920b6 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Fri, 18 Oct 2024 10:23:16 +0800 Subject: [PATCH 1/2] Send IDONTWANT prior to publish --- .../io/libp2p/pubsub/gossip/GossipParams.kt | 2 +- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 6 +++--- .../libp2p/pubsub/gossip/GossipV1_2Tests.kt | 21 +++++++++++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt index b2cad191c..a28200708 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt @@ -242,7 +242,7 @@ data class GossipParams( * [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers. * The default is 16 KB. */ - val iDontWantMinMessageSizeThreshold: Int = 16000, + val iDontWantMinMessageSizeThreshold: Int = 16384, /** * [iDontWantTTL] Expiry time for cache of received IDONTWANT messages for peers diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index d139609f8..d4bd95091 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -166,7 +166,7 @@ open class GossipRouter( } override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) { - iDontWant(msg, peer) + iDontWant(msg) eventBroadcaster.notifyUnseenMessage(peer.peerId, msg) notifyAnyMessage(peer, msg) } @@ -420,6 +420,7 @@ open class GossipRouter( mCache += msg return if (peers.isNotEmpty()) { + iDontWant(msg) val publishedMessages = peers .filterNot { peerDoesNotWantMessage(it, msg.messageId) } .map { submitPublishMessage(it, msg) } @@ -610,7 +611,7 @@ open class GossipRouter( enqueueIwant(peer, messageIds) } - private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler) { + private fun iDontWant(msg: PubsubMessage) { if (!this.protocol.supportsIDontWant()) return if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect @@ -618,7 +619,6 @@ open class GossipRouter( .mapNotNull { mesh[it] } .flatten() .distinct() - .minus(receivedFrom) .forEach { sendIdontwant(it, msg.messageId) } } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt index cc7cd4d58..8ce1919c5 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_2Tests.kt @@ -150,6 +150,27 @@ class GossipV1_2Tests : GossipTestsBase() { assertThat(receivedMessages).containsExactly(msg.protobufMessage) } + @Test + fun iDontWantIsSentOnPublishing() { + val test = startSingleTopicNetwork( + params = GossipParams(iDontWantMinMessageSizeThreshold = 5), + mockRouterCount = 3 + ) + + test.mockRouters.forEach { it.subscribe("topic1") } + val msgToPublish = newMessage("topic1", 0L, "Hello".toByteArray()) + test.gossipRouter.publish(msgToPublish) + test.mockRouters.forEach { + // IDONTWANT is received + it.waitForMessage { msg -> + msg.control.idontwantCount == 1 && + msg.control.idontwantList.first().messageIDsList.map { mIds -> mIds.toWBytes() }.contains(msgToPublish.messageId) + } + // msg is received + it.waitForMessage { msg -> msg.publishCount > 0 } + } + } + private fun startSingleTopicNetwork(params: GossipParams, mockRouterCount: Int): ManyRoutersTest { val test = ManyRoutersTest( protocol = PubsubProtocol.Gossip_V_1_2, From 4ad3068deea72f766d6675c65300d646f50d41c2 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Fri, 18 Oct 2024 18:13:52 +0800 Subject: [PATCH 2/2] feedback --- .../src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index d4bd95091..101986888 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -166,7 +166,7 @@ open class GossipRouter( } override fun notifyUnseenMessage(peer: PeerHandler, msg: PubsubMessage) { - iDontWant(msg) + iDontWant(msg, peer) eventBroadcaster.notifyUnseenMessage(peer.peerId, msg) notifyAnyMessage(peer, msg) } @@ -611,7 +611,7 @@ open class GossipRouter( enqueueIwant(peer, messageIds) } - private fun iDontWant(msg: PubsubMessage) { + private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) { if (!this.protocol.supportsIDontWant()) return if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return // we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect @@ -619,6 +619,7 @@ open class GossipRouter( .mapNotNull { mesh[it] } .flatten() .distinct() + .minus(setOfNotNull(receivedFrom)) .forEach { sendIdontwant(it, msg.messageId) } }