Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ./pubsub,
./timedcache,
./peertable,
./rpc/[message, messages, protobuf],
nimcrypto/[hash, sha2],
../../crypto/crypto,
../../stream/connection,
../../peerid,
Expand All @@ -32,20 +33,29 @@ const FloodSubCodec* = "/floodsub/1.0.0"
type
FloodSub* {.public.} = ref object of PubSub
floodsub*: PeerTable # topic to remote peer map
seen*: TimedCache[MessageId] # message id:s already seen on the network
seenSalt*: seq[byte]

proc hasSeen*(f: FloodSub, msgId: MessageId): bool =
f.seenSalt & msgId in f.seen

proc addSeen*(f: FloodSub, msgId: MessageId): bool =
# Salting the seen hash helps avoid attacks against the hash function used
# in the nim hash table
seen*: TimedCache[SaltedId]
# Early filter for messages recently observed on the network
# We use a salted id because the messages in this cache have not yet
# been validated meaning that an attacker has greater control over the
# hash key and therefore could poison the table
seenSalt*: sha256
# The salt in this case is a partially updated SHA256 context pre-seeded
# with some random data

proc salt*(f: FloodSub, msgId: MessageId): SaltedId =
var tmp = f.seenSalt
tmp.update(msgId)
SaltedId(data: tmp.finish())

proc hasSeen*(f: FloodSub, saltedId: SaltedId): bool =
saltedId in f.seen

proc addSeen*(f: FloodSub, saltedId: SaltedId): bool =
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)
f.seen.put(saltedId)

proc firstSeen*(f: FloodSub, msgId: MessageId): Moment =
f.seen.addedAt(f.seenSalt & msgId)
proc firstSeen*(f: FloodSub, saltedId: SaltedId): Moment =
f.seen.addedAt(saltedId)

proc handleSubscribe*(f: FloodSub,
peer: PubSubPeer,
Expand Down Expand Up @@ -117,9 +127,11 @@ method rpcHandler*(f: FloodSub,
# TODO: descore peers due to error during message validation (malicious?)
continue

let msgId = msgIdResult.get
let
msgId = msgIdResult.get
saltedId = f.salt(msgId)

if f.addSeen(msgId):
if f.addSeen(saltedId):
trace "Dropping already-seen message", msgId, peer
continue

Expand Down Expand Up @@ -216,7 +228,7 @@ method publish*(f: FloodSub,
trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId

if f.addSeen(msgId):
if f.addSeen(f.salt(msgId)):
# custom msgid providers might cause this
trace "Dropping already-seen message", msgId, topic
return 0
Expand All @@ -234,8 +246,11 @@ method publish*(f: FloodSub,
method initPubSub*(f: FloodSub)
{.raises: [InitializationError].} =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageId].init(2.minutes)
f.seenSalt = newSeqUninitialized[byte](sizeof(Hash))
hmacDrbgGenerate(f.rng[], f.seenSalt)
f.seen = TimedCache[SaltedId].init(2.minutes)
f.seenSalt.init()

var tmp: array[32, byte]
hmacDrbgGenerate(f.rng[], tmp)
f.seenSalt.update(tmp)

f.init()
12 changes: 6 additions & 6 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =

proc validateAndRelay(g: GossipSub,
msg: Message,
msgId, msgIdSalted: MessageId,
msgId: MessageId, msgIdSalted: SaltedId,
Comment thread
diegomrsantos marked this conversation as resolved.
peer: PubSubPeer) {.async.} =
try:
let validation = await g.validate(msg)
Expand Down Expand Up @@ -508,12 +508,12 @@ method rpcHandler*(g: GossipSub,

let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
msgIdSalted = g.salt(msgId)
topic = msg.topic

# addSeen adds salt to msgId to avoid
# remote attacking the hash function
if g.addSeen(msgId):
if g.addSeen(msgIdSalted):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer

var alreadyReceived = false
Expand All @@ -523,7 +523,7 @@ method rpcHandler*(g: GossipSub,
alreadyReceived = true

if not alreadyReceived:
let delay = Moment.now() - g.firstSeen(msgId)
let delay = Moment.now() - g.firstSeen(msgIdSalted)
g.rewardDelivered(peer, topic, false, delay)

libp2p_gossipsub_duplicate.inc()
Expand Down Expand Up @@ -690,7 +690,7 @@ method publish*(g: GossipSub,

trace "Created new message", msg = shortLog(msg), peers = peers.len

if g.addSeen(msgId):
if g.addSeen(g.salt(msgId)):
# custom msgid providers might cause this
trace "Dropping already-seen message"
return 0
Expand Down Expand Up @@ -779,7 +779,7 @@ method initPubSub*(g: GossipSub)
raise newException(InitializationError, $validationRes.error)

# init the floodsub stuff here, we customize timedcache in gossip!
g.seen = TimedCache[MessageId].init(g.parameters.seenTTL)
g.seen = TimedCache[SaltedId].init(g.parameters.seenTTL)

# init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ proc handleIHave*(g: GossipSub,
peer, topicID = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.topics:
for msgId in ihave.messageIDs:
if not g.hasSeen(msgId):
if not g.hasSeen(g.salt(msgId)):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIDs:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type
maxNumElementsInNonPriorityQueue*: int

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]]

RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
RoutingRecordsHandler* =
Expand Down
6 changes: 6 additions & 0 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type

MessageId* = seq[byte]

SaltedId* = object
# Salted hash of message ID - used instead of the ordinary message ID to
# avoid hash poisoning attacks and to make memory usage more predictable
# with respect to the variable-length message id
data*: MDigest[256]

Message* = object
fromPeer*: PeerId
data*: seq[byte]
Expand Down
63 changes: 45 additions & 18 deletions libp2p/protocols/pubsub/timedcache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

{.push raises: [].}

import std/[tables]

import std/[hashes, sets]
import chronos/timer, stew/results

import ../../utility

export results

const Timeout* = 10.seconds # default timeout in ms

type
Expand All @@ -26,20 +27,38 @@ type

TimedCache*[K] = object of RootObj
head, tail: TimedEntry[K] # nim linked list doesn't allow inserting at pos
entries: Table[K, TimedEntry[K]]
entries: HashSet[TimedEntry[K]]
timeout: Duration

func `==`*[E](a, b: TimedEntry[E]): bool =
if isNil(a) == isNil(b):
isNil(a) or a.key == b.key
else:
false

func hash*(a: TimedEntry): Hash =
if isNil(a):
default(Hash)
else:
hash(a[].key)

func expire*(t: var TimedCache, now: Moment = Moment.now()) =
while t.head != nil and t.head.expiresAt < now:
t.entries.del(t.head.key)
t.entries.excl(t.head)
t.head.prev = nil
t.head = t.head.next
if t.head == nil: t.tail = nil

func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] =
# Removes existing key from cache, returning the previous value if present
var item: TimedEntry[K]
if t.entries.pop(key, item):
let tmp = TimedEntry[K](key: key)
if tmp in t.entries:
let item = try:
t.entries[tmp] # use the shared instance in the set
except KeyError:
raiseAssert "just checked"
t.entries.excl(item)

if t.head == item: t.head = item.next
if t.tail == item: t.tail = item.prev

Expand All @@ -55,14 +74,14 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# refreshed.
t.expire(now)

var previous = t.del(k) # Refresh existing item

var addedAt = now
previous.withValue(previous):
addedAt = previous.addedAt
let
previous = t.del(k) # Refresh existing item
addedAt = if previous.isSome():
Copy link
Copy Markdown
Contributor

@diegomrsantos diegomrsantos Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a long PR in the past to remove this pattern from the codebase and decrease the risk of raising defects. You can use https://github.com/vacp2p/nim-libp2p/blob/unstable/libp2p/utility.nim#L125

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueOr is not applicable in this case because we're accessing a field of previous[], not previous itself

Copy link
Copy Markdown
Contributor

@diegomrsantos diegomrsantos May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but withValue can be used in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't work in generic code, due to similar problems as arnetheduck/nim-results#34

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to work fine:

    addedAt = block:
      previous.withValue(p):
        p[].addedAt
      else:
        now

previous[].addedAt
else:
now

let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout)

if t.head == nil:
t.tail = node
t.head = t.tail
Expand All @@ -83,16 +102,24 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
if cur == t.tail:
t.tail = node

t.entries[k] = node
t.entries.incl(node)

previous.isSome()

func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries

func addedAt*[K](t: TimedCache[K], k: K): Moment =
t.entries.getOrDefault(k).addedAt

let tmp = TimedEntry[K](key: k)
tmp in t.entries

func addedAt*[K](t: var TimedCache[K], k: K): Moment =
Comment thread
diegomrsantos marked this conversation as resolved.
let tmp = TimedEntry[K](key: k)
try:
if tmp in t.entries: # raising is slow
# Use shared instance from entries
return t.entries[tmp][].addedAt
except KeyError:
raiseAssert "just checked"

default(Moment)

func init*[K](T: type TimedCache[K], timeout: Duration = Timeout): T =
T(
Expand Down
4 changes: 2 additions & 2 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ suite "GossipSub":
proc slowValidator(topic: string, message: Message): Future[ValidationResult] {.async.} =
await cRelayed
# Empty A & C caches to detect duplicates
gossip1.seen = TimedCache[MessageId].init()
gossip3.seen = TimedCache[MessageId].init()
gossip1.seen = TimedCache[SaltedId].init()
gossip3.seen = TimedCache[SaltedId].init()
let msgId = toSeq(gossip2.validationSeen.keys)[0]
checkUntilTimeout(try: gossip2.validationSeen[msgId].len > 0 except: false)
result = ValidationResult.Accept
Expand Down
19 changes: 19 additions & 0 deletions tests/pubsub/testtimedcache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ suite "TimedCache":
2 in cache
3 in cache

cache.addedAt(2) == now + 3.seconds

check:
cache.put(2, now + 7.seconds) # refreshes 2
not cache.put(4, now + 12.seconds) # expires 3
Expand All @@ -33,6 +35,23 @@ suite "TimedCache":
3 notin cache
4 in cache

check:
cache.del(4).isSome()
4 notin cache

check:
not cache.put(100, now + 100.seconds) # expires everything
100 in cache
2 notin cache

test "enough items to force cache heap storage growth":
var cache = TimedCache[int].init(5.seconds)

let now = Moment.now()
for i in 101..100000:
check:
not cache.put(i, now)

for i in 101..100000:
check:
i in cache