From 9cb820822df84042bd26f09ff7a1f4483bec6479 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 19 Mar 2024 15:49:39 +0100 Subject: [PATCH 1/5] Improve memory efficiency of seen cache The `seen` cache currently is a significant memory usage hotspot due to its inefficient implementation: for every entry, two copies of the message id + timing data + `seq` overhead causes it to use much more memory than it has to. In addition, each check involves several layers of allocations as the computed message id gets salted. This PR improves on the situation by: * using a hash of the message id with the salt instead of joining strings * computing the salted id only once per message * storing one digest instead of two message id:s --- libp2p/protocols/pubsub/floodsub.nim | 41 ++++++++----- libp2p/protocols/pubsub/gossipsub.nim | 12 ++-- .../protocols/pubsub/gossipsub/behavior.nim | 2 +- libp2p/protocols/pubsub/gossipsub/types.nim | 2 +- libp2p/protocols/pubsub/rpc/messages.nim | 6 ++ libp2p/protocols/pubsub/timedcache.nim | 59 +++++++++++++------ tests/pubsub/testgossipsub.nim | 4 +- 7 files changed, 84 insertions(+), 42 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 8809d6fe71..2d7ebcb2bb 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -16,6 +16,7 @@ import ./pubsub, ./timedcache, ./peertable, ./rpc/[message, messages, protobuf], + nimcrypto/[hash, sha2], ../../crypto/crypto, ../../stream/connection, ../../peerid, @@ -32,20 +33,27 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* {.public.} = ref object of PubSub floodsub*: PeerTable # topic to remote peer map - seen*: TimedCache[MessageId] # message id:s already seen on the network - seenSalt*: seq[byte] + seen*: TimedCache[SaltedId] # message id:s already seen on the network + seenSalt*: sha256 + # The salt in this case is a partially updated SHA256 context pre-seeded + # with some random data -proc hasSeen*(f: FloodSub, msgId: MessageId): bool = - f.seenSalt & msgId in f.seen +proc salt*(f: FloodSub, msgId: MessageId): SaltedId = + var tmp = f.seenSalt + tmp.update(msgId) + SaltedId(data: tmp.finish()) -proc addSeen*(f: FloodSub, msgId: MessageId): bool = +proc hasSeen*(f: FloodSub, msgId: SaltedId): bool = + msgId in f.seen + +proc addSeen*(f: FloodSub, msgId: SaltedId): bool = # Salting the seen hash helps avoid attacks against the hash function used # in the nim hash table # Return true if the message has already been seen - f.seen.put(f.seenSalt & msgId) + f.seen.put(msgId) -proc firstSeen*(f: FloodSub, msgId: MessageId): Moment = - f.seen.addedAt(f.seenSalt & msgId) +proc firstSeen*(f: FloodSub, msgId: SaltedId): Moment = + f.seen.addedAt(msgId) proc handleSubscribe*(f: FloodSub, peer: PubSubPeer, @@ -117,9 +125,11 @@ method rpcHandler*(f: FloodSub, # TODO: descore peers due to error during message validation (malicious?) continue - let msgId = msgIdResult.get + let + msgId = msgIdResult.get + saltedId = f.salt(msgId) - if f.addSeen(msgId): + if f.addSeen(saltedId): trace "Dropping already-seen message", msgId, peer continue @@ -213,7 +223,7 @@ method publish*(f: FloodSub, trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId - if f.addSeen(msgId): + if f.addSeen(f.salt(msgId)): # custom msgid providers might cause this trace "Dropping already-seen message", msgId, topic return 0 @@ -231,8 +241,11 @@ method publish*(f: FloodSub, method initPubSub*(f: FloodSub) {.raises: [InitializationError].} = procCall PubSub(f).initPubSub() - f.seen = TimedCache[MessageId].init(2.minutes) - f.seenSalt = newSeqUninitialized[byte](sizeof(Hash)) - hmacDrbgGenerate(f.rng[], f.seenSalt) + f.seen = TimedCache[SaltedId].init(2.minutes) + f.seenSalt.init() + + var tmp: array[32, byte] + hmacDrbgGenerate(f.rng[], tmp) + f.seenSalt.update(tmp) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0798081292..ad4c8ea064 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -318,7 +318,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = proc validateAndRelay(g: GossipSub, msg: Message, - msgId, msgIdSalted: MessageId, + msgId: MessageId, msgIdSalted: SaltedId, peer: PubSubPeer) {.async.} = try: let validation = await g.validate(msg) @@ -478,11 +478,11 @@ method rpcHandler*(g: GossipSub, let msgId = msgIdResult.get - msgIdSalted = msgId & g.seenSalt + msgIdSalted = g.salt(msgId) # addSeen adds salt to msgId to avoid # remote attacking the hash function - if g.addSeen(msgId): + if g.addSeen(msgIdSalted): trace "Dropping already-seen message", msgId = shortLog(msgId), peer var alreadyReceived = false @@ -492,7 +492,7 @@ method rpcHandler*(g: GossipSub, alreadyReceived = true if not alreadyReceived: - let delay = Moment.now() - g.firstSeen(msgId) + let delay = Moment.now() - g.firstSeen(msgIdSalted) g.rewardDelivered(peer, msg.topicIds, false, delay) libp2p_gossipsub_duplicate.inc() @@ -659,7 +659,7 @@ method publish*(g: GossipSub, trace "Created new message", msg = shortLog(msg), peers = peers.len - if g.addSeen(msgId): + if g.addSeen(g.salt(msgId)): # custom msgid providers might cause this trace "Dropping already-seen message" return 0 @@ -748,7 +748,7 @@ method initPubSub*(g: GossipSub) raise newException(InitializationError, $validationRes.error) # init the floodsub stuff here, we customize timedcache in gossip! - g.seen = TimedCache[MessageId].init(g.parameters.seenTTL) + g.seen = TimedCache[SaltedId].init(g.parameters.seenTTL) # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index b626cb1da0..5c92118ec2 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -251,7 +251,7 @@ proc handleIHave*(g: GossipSub, peer, topic = ihave.topicId, msgs = ihave.messageIds if ihave.topicId in g.topics: for msgId in ihave.messageIds: - if not g.hasSeen(msgId): + if not g.hasSeen(g.salt(msgId)): if peer.iHaveBudget <= 0: break elif msgId notin res.messageIds: diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 06fa55eb30..85ab59bc35 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -148,7 +148,7 @@ type disconnectPeerAboveRateLimit*: bool BackoffTable* = Table[string, Table[PeerId, Moment]] - ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] + ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]] RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]] RoutingRecordsHandler* = diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 77baded788..a5dbba24d6 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -37,6 +37,12 @@ type MessageId* = seq[byte] + SaltedId* = object + # Salted hash of message ID - used instead of the ordinary message ID to + # avoid hash poisoning attacks and to make memory usage more predictable + # with respect to the variable-length message id + data*: MDigest[256] + Message* = object fromPeer*: PeerId data*: seq[byte] diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index fbac8db6bf..891879b227 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -9,8 +9,7 @@ {.push raises: [].} -import std/[tables] - +import std/[hashes, sets] import chronos/timer, stew/results import ../../utility @@ -26,20 +25,36 @@ type TimedCache*[K] = object of RootObj head, tail: TimedEntry[K] # nim linked list doesn't allow inserting at pos - entries: Table[K, TimedEntry[K]] + entries: HashSet[TimedEntry[K]] timeout: Duration +func `==`*[E](a, b: TimedEntry[E]): bool = + if isNil(a) == isNil(b): + isNil(a) or a.key == b.key + else: + false + +func hash*(a: TimedEntry): Hash = + if isNil(a): + hash(a.key) + else: + default(Hash) + func expire*(t: var TimedCache, now: Moment = Moment.now()) = while t.head != nil and t.head.expiresAt < now: - t.entries.del(t.head.key) + t.entries.excl(t.head) t.head.prev = nil t.head = t.head.next if t.head == nil: t.tail = nil func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] = # Removes existing key from cache, returning the previous value if present - var item: TimedEntry[K] - if t.entries.pop(key, item): + let tmp = TimedEntry[K](key: key) + if tmp in t.entries: + let item = try: + t.entries[tmp] # use the shared instance in the set + except KeyError as exc: + raiseAssert "just checked" if t.head == item: t.head = item.next if t.tail == item: t.tail = item.prev @@ -55,14 +70,14 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = # refreshed. t.expire(now) - var previous = t.del(k) # Refresh existing item - - var addedAt = now - previous.withValue(previous): - addedAt = previous.addedAt + let + previous = t.del(k) # Refresh existing item + addedAt = if previous.isSome(): + previous[].addedAt + else: + now let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout) - if t.head == nil: t.tail = node t.head = t.tail @@ -83,16 +98,24 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = if cur == t.tail: t.tail = node - t.entries[k] = node + t.entries.incl(node) previous.isSome() func contains*[K](t: TimedCache[K], k: K): bool = - k in t.entries - -func addedAt*[K](t: TimedCache[K], k: K): Moment = - t.entries.getOrDefault(k).addedAt - + let tmp = TimedEntry[K](key: k) + tmp in t.entries + +func addedAt*[K](t: var TimedCache[K], k: K): Moment = + let tmp = TimedEntry[K](key: k) + try: + if tmp in t.entries: # raising is slow + # Use shared instance from entries + return t.entries[tmp][].addedAt + except KeyError: + raiseAssert "just checked" + + default(Moment) func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T = T( diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index a843ed51d5..736e8d4a7a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -569,8 +569,8 @@ suite "GossipSub": proc slowValidator(topic: string, message: Message): Future[ValidationResult] {.async.} = await cRelayed # Empty A & C caches to detect duplicates - gossip1.seen = TimedCache[MessageId].init() - gossip3.seen = TimedCache[MessageId].init() + gossip1.seen = TimedCache[SaltedId].init() + gossip3.seen = TimedCache[SaltedId].init() let msgId = toSeq(gossip2.validationSeen.keys)[0] checkUntilTimeout(try: gossip2.validationSeen[msgId].len > 0 except: false) result = ValidationResult.Accept From 4254ced15e9417cbc1b450572f17c0fee8f0f75d Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 19 Mar 2024 23:09:13 +0100 Subject: [PATCH 2/5] fix del, hash --- libp2p/protocols/pubsub/timedcache.nim | 10 +++++++--- tests/pubsub/testtimedcache.nim | 4 ++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index 891879b227..ca08f0aef4 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -14,6 +14,8 @@ import chronos/timer, stew/results import ../../utility +export results + const Timeout* = 10.seconds # default timeout in ms type @@ -36,9 +38,9 @@ func `==`*[E](a, b: TimedEntry[E]): bool = func hash*(a: TimedEntry): Hash = if isNil(a): - hash(a.key) - else: default(Hash) + else: + hash(a[].key) func expire*(t: var TimedCache, now: Moment = Moment.now()) = while t.head != nil and t.head.expiresAt < now: @@ -53,8 +55,10 @@ func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] = if tmp in t.entries: let item = try: t.entries[tmp] # use the shared instance in the set - except KeyError as exc: + except KeyError: raiseAssert "just checked" + t.entries.excl(item) + if t.head == item: t.head = item.next if t.tail == item: t.tail = item.prev diff --git a/tests/pubsub/testtimedcache.nim b/tests/pubsub/testtimedcache.nim index 3fcbf28f2c..10a6a3bcdb 100644 --- a/tests/pubsub/testtimedcache.nim +++ b/tests/pubsub/testtimedcache.nim @@ -33,6 +33,10 @@ suite "TimedCache": 3 notin cache 4 in cache + check: + cache.del(4).isSome() + 4 notin cache + check: not cache.put(100, now + 100.seconds) # expires everything 100 in cache From 3c74db539d6a8ab0810fa43493aef35d3368bf46 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 20 Mar 2024 06:57:16 +0100 Subject: [PATCH 3/5] a few more tests --- tests/pubsub/testtimedcache.nim | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/pubsub/testtimedcache.nim b/tests/pubsub/testtimedcache.nim index 10a6a3bcdb..912970e57e 100644 --- a/tests/pubsub/testtimedcache.nim +++ b/tests/pubsub/testtimedcache.nim @@ -24,6 +24,8 @@ suite "TimedCache": 2 in cache 3 in cache + cache.addedAt(2) == now + 3.seconds + check: cache.put(2, now + 7.seconds) # refreshes 2 not cache.put(4, now + 12.seconds) # expires 3 @@ -40,3 +42,16 @@ suite "TimedCache": check: not cache.put(100, now + 100.seconds) # expires everything 100 in cache + 2 notin cache + + test "spam": + var cache = TimedCache[int].init(5.seconds) + + let now = Moment.now() + for i in 101..100000: + check: + not cache.put(i, now) + + for i in 101..100000: + check: + i in cache From 513ee0bba5b7713745632a6609c90693baf8357e Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 4 Apr 2024 14:16:33 +0200 Subject: [PATCH 4/5] document salting strategy --- libp2p/protocols/pubsub/floodsub.nim | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 6e20c77703..0b4d886f48 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -33,7 +33,11 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* {.public.} = ref object of PubSub floodsub*: PeerTable # topic to remote peer map - seen*: TimedCache[SaltedId] # message id:s already seen on the network + seen*: TimedCache[SaltedId] + # Early filter for messages recently observed on the network + # We use a salted id because the messages in this cache have not yet + # been validated meaning that an attacker has greater control over the + # hash key and therefore could poison the table seenSalt*: sha256 # The salt in this case is a partially updated SHA256 context pre-seeded # with some random data @@ -43,17 +47,15 @@ proc salt*(f: FloodSub, msgId: MessageId): SaltedId = tmp.update(msgId) SaltedId(data: tmp.finish()) -proc hasSeen*(f: FloodSub, msgId: SaltedId): bool = - msgId in f.seen +proc hasSeen*(f: FloodSub, saltedId: SaltedId): bool = + saltedId in f.seen -proc addSeen*(f: FloodSub, msgId: SaltedId): bool = - # Salting the seen hash helps avoid attacks against the hash function used - # in the nim hash table +proc addSeen*(f: FloodSub, saltedId: SaltedId): bool = # Return true if the message has already been seen - f.seen.put(msgId) + f.seen.put(saltedId) -proc firstSeen*(f: FloodSub, msgId: SaltedId): Moment = - f.seen.addedAt(msgId) +proc firstSeen*(f: FloodSub, saltedId: SaltedId): Moment = + f.seen.addedAt(saltedId) proc handleSubscribe*(f: FloodSub, peer: PubSubPeer, From 6c42af5f0108b16e85f876ff8c13b7d5f03703db Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 1 May 2024 13:13:18 +0200 Subject: [PATCH 5/5] test name --- tests/pubsub/testtimedcache.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pubsub/testtimedcache.nim b/tests/pubsub/testtimedcache.nim index 912970e57e..917ddfe269 100644 --- a/tests/pubsub/testtimedcache.nim +++ b/tests/pubsub/testtimedcache.nim @@ -44,7 +44,7 @@ suite "TimedCache": 100 in cache 2 notin cache - test "spam": + test "enough items to force cache heap storage growth": var cache = TimedCache[int].init(5.seconds) let now = Moment.now()