From bf3416ae9648e38757aa0bcd73264646ac94b43f Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 19 Mar 2025 14:07:52 +0200 Subject: [PATCH 01/11] Add custom filters for PeerPool. --- beacon_chain/networking/eth2_network.nim | 13 +- beacon_chain/networking/peer_pool.nim | 915 +++++++++++++---------- tests/test_peer_pool.nim | 309 +++++++- 3 files changed, 819 insertions(+), 418 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index b9d3d37970..4e1a06c714 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -480,15 +480,12 @@ func netKbps*(peer: Peer): float {.inline.} = ## Returns current network throughput average value in Kbps for peer ``peer``. round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) -# /!\ Must be exported to be seen by `peerCmp` -func `<`*(a, b: Peer): bool = - ## Comparison function indicating `true` if peer `a` ranks worse than peer `b` - if a.score != b.score: - a.score < b.score - elif a.netThroughput.average != b.netThroughput.average: - a.netThroughput.average < b.netThroughput.average +# /!\ Must be exported to be seen by `peerpool`. +proc cmp*(a, b: Peer): int = + if a.score == b.score: + cmp(a.netThroughput.average, b.netThroughput.average) else: - system.`<`(a, b) + cmp(a.score, b.score) const maxRequestQuota = 1000000 diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 611c86e304..f03978f5a5 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[tables, heapqueue] +import std/[tables, heapqueue, algorithm, sequtils] import chronos export tables @@ -19,9 +19,6 @@ type PeerFlags = enum Acquired, DeleteOnRelease - EventType = enum - NotEmptyEvent, NotFullEvent - PeerStatus* = enum Success, ## Peer was successfully added to PeerPool. DuplicateError, ## Peer is already present in PeerPool. @@ -29,15 +26,13 @@ type LowScoreError, ## Peer has too low score. DeadPeerError ## Peer is already dead. + PeerIndex = distinct int + PeerItem[T] = object data: T peerType: PeerType flags: set[PeerFlags] - index: int - - PeerIndex = object - data: int - cmp: proc(a, b: PeerIndex): bool {.gcsafe, raises: [].} + index: PeerIndex PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [].} @@ -45,16 +40,15 @@ type PeerOnDeleteCallback*[T] = proc(peer: T) {.gcsafe, raises: [].} + PeerCustomFilterCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [].} + PeerPool*[A, B] = ref object - incNotEmptyEvent*: AsyncEvent - outNotEmptyEvent*: AsyncEvent - incNotFullEvent*: AsyncEvent - outNotFullEvent*: AsyncEvent - incQueue: HeapQueue[PeerIndex] - outQueue: HeapQueue[PeerIndex] - registry: Table[B, PeerIndex] + changeEvent: AsyncEvent storage: seq[PeerItem[A]] - cmp: proc(a, b: PeerIndex): bool {.gcsafe, raises: [].} + registry: Table[B, PeerIndex] + sorted: seq[PeerIndex] + unsorted: seq[PeerIndex] + empties: seq[PeerIndex] scoreCheck: PeerScoreCheckCallback[A] onDeletePeer: PeerOnDeleteCallback[A] peerCounter: PeerCounterCallback @@ -68,91 +62,38 @@ type PeerPoolError* = object of CatchableError -proc `<`*(a, b: PeerIndex): bool = - ## PeerIndex ``a`` holds reference to ``cmp()`` procedure which has captured - ## PeerPool instance. - a.cmp(b, a) - -proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B], - item: PeerItem[A]) = - case item.peerType: - of PeerType.Incoming: - pool.incNotEmptyEvent.fire() - of PeerType.Outgoing: - pool.outNotEmptyEvent.fire() - -proc fireNotFullEvent[A, B](pool: PeerPool[A, B], - item: PeerItem[A]) = - case item.peerType: - of PeerType.Incoming: - pool.incNotFullEvent.fire() - of PeerType.Outgoing: - pool.outNotFullEvent.fire() +proc `==`*(a, b: PeerIndex): bool {.borrow.} iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) = - for peerId, peerIdx in pool.registry: - yield (peerId, pool.storage[peerIdx.data].data) - -template incomingEvent(eventType: EventType): AsyncEvent = - case eventType - of EventType.NotEmptyEvent: - pool.incNotEmptyEvent - of EventType.NotFullEvent: - pool.incNotFullEvent - -template outgoingEvent(eventType: EventType): AsyncEvent = - case eventType - of EventType.NotEmptyEvent: - pool.outNotEmptyEvent - of EventType.NotFullEvent: - pool.outNotFullEvent - -proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, - filter: set[PeerType]) {.async: (raises: [CancelledError]).} = - if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: - var fut1 = incomingEvent(eventType).wait() - var fut2 = outgoingEvent(eventType).wait() - try: - try: - discard await one(fut1, fut2) - except ValueError: - raiseAssert "one precondition satisfied" - if fut1.finished(): - if not(fut2.finished()): - await fut2.cancelAndWait() - incomingEvent(eventType).clear() - else: - if not(fut1.finished()): - await fut1.cancelAndWait() - outgoingEvent(eventType).clear() - except CancelledError as exc: - var pending: seq[FutureBase] - if not(fut1.finished()): - pending.add(fut1.cancelAndWait()) - if not(fut2.finished()): - pending.add(fut2.cancelAndWait()) - await noCancel allFutures(pending) - raise exc - elif PeerType.Incoming in filter: - await incomingEvent(eventType).wait() - incomingEvent(eventType).clear() - elif PeerType.Outgoing in filter: - await outgoingEvent(eventType).wait() - outgoingEvent(eventType).clear() - -proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = - pool.waitForEvent(EventType.NotEmptyEvent, filter) - -proc waitNotFullEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = - pool.waitForEvent(EventType.NotFullEvent, filter) - -proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, - maxOutgoingPeers = -1, - scoreCheckCb: PeerScoreCheckCallback[A] = nil, - peerCounterCb: PeerCounterCallback = nil, - onDeleteCb: PeerOnDeleteCallback[A] = nil): PeerPool[A, B] = + for peerId, pindex in pool.registry: + yield (peerId, pool.storage[int(pindex)].data) + +proc resort[A, B](pool: PeerPool[A, B], + unsorted: openArray[PeerIndex]): seq[PeerIndex] = + mixin `cmp` + proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = + cmp(pool.storage[int(a)].data, pool.storage[int(b)].data) + unsorted.sorted(pcmp, order = SortOrder.Descending) + +proc addToStorage[A, B](pool: PeerPool[A, B], item: PeerItem[A]): PeerIndex = + var indexedItem = item + if len(pool.empties) > 0: + indexedItem.index = pool.empties[0] + pool.storage[int(indexedItem.index)] = indexedItem + pool.empties.del(0) + else: + indexedItem.index = PeerIndex(len(pool.storage)) + pool.storage.add(indexedItem) + indexedItem.index + +proc newPeerPool*[A, B]( + maxPeers = -1, + maxIncomingPeers = -1, + maxOutgoingPeers = -1, + scoreCheckCb: PeerScoreCheckCallback[A] = nil, + peerCounterCb: PeerCounterCallback = nil, + onDeleteCb: PeerOnDeleteCallback[A] = nil +): PeerPool[A, B] = ## Create new PeerPool. ## ## ``maxPeers`` - maximum number of peers allowed. All the peers which @@ -178,40 +119,38 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, ## ## Please note, that if ``maxPeers`` is positive non-zero value, then equation ## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``. - var res = PeerPool[A, B]() if maxPeers != -1: doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers) - res.maxPeersCount = if maxPeers < 0: high(int) else: maxPeers - res.maxIncPeersCount = - if maxIncomingPeers < 0: - high(int) - else: - maxIncomingPeers - res.maxOutPeersCount = - if maxOutgoingPeers < 0: - high(int) - else: - maxOutgoingPeers - - res.incNotEmptyEvent = newAsyncEvent() - res.outNotEmptyEvent = newAsyncEvent() - res.incNotFullEvent = newAsyncEvent() - res.outNotFullEvent = newAsyncEvent() - res.incQueue = initHeapQueue[PeerIndex]() - res.outQueue = initHeapQueue[PeerIndex]() - res.registry = initTable[B, PeerIndex]() - res.scoreCheck = scoreCheckCb - res.peerCounter = peerCounterCb - res.onDeletePeer = onDeleteCb - res.storage = newSeq[PeerItem[A]]() - - proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} = - let p1 = res.storage[a.data].data - let p2 = res.storage[b.data].data - p1 < p2 - - res.cmp = peerCmp + let + maxPeersCount = if maxPeers < 0: high(int) else: maxPeers + maxIncPeersCount = + if maxIncomingPeers < 0: + high(int) + else: + maxIncomingPeers + maxOutPeersCount = + if maxOutgoingPeers < 0: + high(int) + else: + maxOutgoingPeers + res = PeerPool[A, B]( + changeEvent: newAsyncEvent(), + registry: initTable[B, PeerIndex](), + scoreCheck: scoreCheckCb, + peerCounter: peerCounterCb, + onDeletePeer: onDeleteCb, + storage: newSeq[PeerItem[A]](), + unsorted: newSeq[PeerIndex](), + sorted: newSeq[PeerIndex](), + maxPeersCount: maxPeersCount, + maxIncPeersCount: maxIncPeersCount, + maxOutPeersCount: maxOutPeersCount, + curIncPeersCount: 0, + curOutPeersCount: 0, + acqIncPeersCount: 0, + acqOutPeersCount: 0 + ) res proc len*[A, B](pool: PeerPool[A, B]): int = @@ -227,44 +166,80 @@ proc lenCurrent*[A, B](pool: PeerPool[A, B], (if PeerType.Incoming in filter: pool.curIncPeersCount else: 0) + (if PeerType.Outgoing in filter: pool.curOutPeersCount else: 0) -proc lenAvailable*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = - ## Returns number of available peers in PeerPool ``pool`` which satisfies - ## filter ``filter``. - (if PeerType.Incoming in filter: len(pool.incQueue) else: 0) + - (if PeerType.Outgoing in filter: len(pool.outQueue) else: 0) +proc lenAvailable*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): int {.inline.} = + ## Returns number of peers available for acquisition in PeerPool + ## ``pool`` which satisfies filter ``filter``. + (if PeerType.Incoming in filter: + pool.curIncPeersCount - pool.acqIncPeersCount + else: + 0) + + (if PeerType.Outgoing in filter: + pool.curOutPeersCount - pool.acqOutPeersCount + else: + 0) + +proc lenAvailable*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): int = + ## Returns number of peers available for acquisition in PeerPool + ## ``pool`` which satisfies filter ``filter`` and custom filter + ## ``customFilter``. + ## Note: This is O(n) operation. + let available = pool.lenAvailable(filter) + var res = 0 + for sindex, pindex in pool.sorted.pairs(): + let item = addr(pool.storage[int(pindex)]) + if (PeerFlags.Acquired notin item[].flags) and + (item[].peerType in filter) and + (isNil(customFilter) or customFilter(item[].data)): + inc(res) + if res == available: + # Number of customly filtered items could not be higher than number of + # peers of specific directions. + break + res -proc lenAcquired*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = +proc lenAcquired*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): int {.inline.} = ## Returns number of acquired peers in PeerPool ``pool`` which satisifies ## filter ``filter``. (if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) + (if PeerType.Outgoing in filter: pool.acqOutPeersCount else: 0) -proc lenSpace*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = +proc lenSpace*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): int {.inline.} = ## Returns number of available space for peers in PeerPool ``pool`` which ## satisfies filter ``filter``. - let curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount - let totalSpace = pool.maxPeersCount - curPeersCount - let incoming = min(totalSpace, pool.maxIncPeersCount - pool.curIncPeersCount) - let outgoing = min(totalSpace, pool.maxOutPeersCount - pool.curOutPeersCount) + let + curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount + spaceAvailable = pool.maxPeersCount - curPeersCount + incoming = min(spaceAvailable, + pool.maxIncPeersCount - pool.curIncPeersCount) + outgoing = min(spaceAvailable, + pool.maxOutPeersCount - pool.curOutPeersCount) if filter == {PeerType.Incoming, PeerType.Outgoing}: # To avoid overflow check we need to check by ourself. if uint64(incoming) + uint64(outgoing) > uint64(high(int)): - min(totalSpace, high(int)) + min(spaceAvailable, high(int)) else: - min(totalSpace, incoming + outgoing) + min(spaceAvailable, incoming + outgoing) elif PeerType.Incoming in filter: incoming else: outgoing proc shortLogAvailable*[A, B](pool: PeerPool[A, B]): string = - $len(pool.incQueue) & "/" & $len(pool.outQueue) + $pool.lenAvailable({PeerType.Incoming}) & "/" & + $pool.lenAvailable({PeerType.Outgoing}) proc shortLogAcquired*[A, B](pool: PeerPool[A, B]): string = $pool.acqIncPeersCount & "/" & $pool.acqOutPeersCount @@ -293,6 +268,25 @@ proc peerDeleted[A, B](pool: PeerPool[A, B], peer: A) = if not(isNil(pool.onDeletePeer)): pool.onDeletePeer(peer) +proc deletePeerImpl[A, B]( + pool: PeerPool[A, B], + peer: A, + key: B, + pindex: PeerIndex +) = + pool.storage[int(pindex)] = PeerItem[A](index: PeerIndex(-1)) + pool.empties.add(pindex) + pool.registry.del(key) + let uindex = pool.unsorted.find(pindex) + # `unsorted` keeps only peers which are available for acquisition, + if uindex >= 0: + pool.unsorted.del(uindex) + pool.sorted = pool.resort(pool.unsorted) + # Indicate that we have an empty space + pool.changeEvent.fire() + pool.peerDeleted(peer) + pool.peerCountChanged() + proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## Remove ``peer`` from PeerPool ``pool``. ## @@ -300,78 +294,59 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## be deleted only when peer will be released. You can change this behavior ## with ``force`` option. mixin getKey - let key = getKey(peer) - if pool.registry.hasKey(key): - let pindex = try: pool.registry[key].data - except KeyError: raiseAssert "checked with hasKey" - var item = addr(pool.storage[pindex]) - if (PeerFlags.Acquired in item[].flags): - if not(force): - item[].flags.incl(PeerFlags.DeleteOnRelease) - else: - if item[].peerType == PeerType.Incoming: - dec(pool.curIncPeersCount) - dec(pool.acqIncPeersCount) - elif item[].peerType == PeerType.Outgoing: - dec(pool.curOutPeersCount) - dec(pool.acqOutPeersCount) - - # Indicate that we have an empty space - pool.fireNotFullEvent(item[]) - # Cleanup storage with default item, and removing key from hashtable. - pool.storage[pindex] = PeerItem[A]() - pool.registry.del(key) - pool.peerDeleted(peer) - pool.peerCountChanged() + let + key = peer.getKey() + pindex = + block: + let res = pool.registry.getOrDefault(key, PeerIndex(-1)) + if res == PeerIndex(-1): + return false + res + + var item = addr(pool.storage[int(pindex)]) + if (PeerFlags.Acquired in item[].flags): + if not(force): + item[].flags.incl(PeerFlags.DeleteOnRelease) else: - if item[].peerType == PeerType.Incoming: - # If peer is available, then its copy present in heapqueue, so we need - # to remove it. - for i in 0 ..< len(pool.incQueue): - if pool.incQueue[i].data == pindex: - pool.incQueue.del(i) - break + case item[].peerType + of PeerType.Incoming: + dec(pool.acqIncPeersCount) dec(pool.curIncPeersCount) - elif item[].peerType == PeerType.Outgoing: - # If peer is available, then its copy present in heapqueue, so we need - # to remove it. - for i in 0 ..< len(pool.outQueue): - if pool.outQueue[i].data == pindex: - pool.outQueue.del(i) - break + of PeerType.Outgoing: + dec(pool.acqOutPeersCount) dec(pool.curOutPeersCount) - - # Indicate that we have an empty space - pool.fireNotFullEvent(item[]) - # Cleanup storage with default item, and removing key from hashtable. - pool.storage[pindex] = PeerItem[A]() - pool.registry.del(key) - pool.peerDeleted(peer) - pool.peerCountChanged() - true + pool.deletePeerImpl(peer, key, pindex) else: - false + case item[].peerType + of PeerType.Incoming: + dec(pool.curIncPeersCount) + of PeerType.Outgoing: + dec(pool.curOutPeersCount) + pool.deletePeerImpl(peer, key, pindex) + + true proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, peerType: PeerType) = + mixin getFuture proc onPeerClosed(udata: pointer) {.gcsafe, raises: [].} = discard pool.deletePeer(peer) - let item = PeerItem[A](data: peer, peerType: peerType, - index: len(pool.storage)) - pool.storage.add(item) - var pitem = addr(pool.storage[^1]) - let pindex = PeerIndex(data: item.index, cmp: pool.cmp) + let + item = PeerItem[A](data: peer, peerType: peerType) + pindex = pool.addToStorage(item) + pitem = addr(pool.storage[int(pindex)]) + pool.registry[peerKey] = pindex + pool.unsorted.add(pindex) + pool.sorted = pool.resort(pool.unsorted) pitem[].data.getFuture().addCallback(onPeerClosed) - if peerType == PeerType.Incoming: + case peerType + of PeerType.Incoming: inc(pool.curIncPeersCount) - pool.incQueue.push(pindex) - pool.incNotEmptyEvent.fire() - elif peerType == PeerType.Outgoing: + of PeerType.Outgoing: inc(pool.curOutPeersCount) - pool.outQueue.push(pindex) - pool.outNotEmptyEvent.fire() + pool.changeEvent.fire() pool.peerCountChanged() proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = @@ -395,8 +370,11 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = else: PeerStatus.DuplicateError -proc addPeerNoWait*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): PeerStatus = +proc addPeerNoWait*[A, B]( + pool: PeerPool[A, B], + peer: A, + peerType: PeerType +): PeerStatus = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## Procedure returns ``PeerStatus`` @@ -428,41 +406,21 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B], else: PeerStatus.NoSpaceError -proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], - peerType: PeerType): set[PeerType] {.inline.} = - ## This procedure returns set of events which you need to wait to get empty - ## space for peer type ``peerType``. This set can be used for call to - ## ``waitNotFullEvent()``. - case peerType: - of PeerType.Incoming: - if pool.maxIncPeersCount >= pool.maxPeersCount: - # If maximum number of `incoming` peers is only limited by - # maximum number of peers, then we could wait for both events. - # It means that we do not care about what peer will left pool. - {PeerType.Incoming, PeerType.Outgoing} - else: - # Otherwise we could wait only for `incoming` event - {PeerType.Incoming} - of PeerType.Outgoing: - if pool.maxOutPeersCount >= pool.maxPeersCount: - # If maximum number of `outgoing` peers is only limited by - # maximum number of peers, then we could wait for both events. - # It means that we do not care about what peer will left pool. - {PeerType.Incoming, PeerType.Outgoing} - else: - # Otherwise we could wait only for `outgoing` event - {PeerType.Outgoing} - -proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], - peerType: PeerType) {.async: (raises: [CancelledError]).} = +proc waitForEmptySpace*[A, B]( + pool: PeerPool[A, B], + peerType: PeerType +) {.async: (raises: [CancelledError]).} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. - let mask = pool.getPeerSpaceMask(peerType) while pool.lenSpace({peerType}) == 0: - await pool.waitNotFullEvent(mask) + await pool.changeEvent.wait() + pool.changeEvent.clear() -proc addPeer*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = +proc addPeer*[A, B]( + pool: PeerPool[A, B], + peer: A, + peerType: PeerType +): Future[PeerStatus] {.async: (raises: [CancelledError]).} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -474,123 +432,166 @@ proc addPeer*[A, B](pool: PeerPool[A, B], ## ## Procedure returns (PeerStatus.Success) on success. mixin getKey - let res = + + template check(peer: untyped) = + let res = pool.checkPeer(peer) + if res != PeerStatus.Success: + return res + + while pool.lenSpace({peerType}) == 0: + peer.check() + await pool.changeEvent.wait() + pool.changeEvent.clear() + + # Because we could wait for a long time we need to check peer one more + # time to avoid race condition. + peer.check() + + pool.addPeerImpl(peer, peer.getKey(), peerType) + PeerStatus.Success + +proc acquireItemImpl[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] = nil +): A {.inline.} = + let (sindex, pitem) = block: - let res1 = pool.checkPeer(peer) - if res1 != PeerStatus.Success: - res1 - else: - let mask = pool.getPeerSpaceMask(peerType) - # We going to block here until ``pool`` will not have free space, - # for our type of peer. - while pool.lenSpace({peerType}) == 0: - await pool.waitNotFullEvent(mask) - # Because we could wait for a long time we need to check peer one more - # time to avoid race condition. - let res2 = pool.checkPeer(peer) - if res2 == PeerStatus.Success: - let peerKey = peer.getKey() - pool.addPeerImpl(peer, peerKey, peerType) - PeerStatus.Success - else: - res2 - return res - -proc acquireItemImpl[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): A {.inline.} = - doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0)) - let pindex = - if filter == {PeerType.Incoming, PeerType.Outgoing}: - if len(pool.outQueue) > 0 and len(pool.incQueue) > 0: - # `<` here is the `PeerIndex` implementation (`HeapQueue` uses `<`), - # which then flips the arguments to rank `>` on `A` using `pool.cmp` - if pool.incQueue[0] < pool.outQueue[0]: - inc(pool.acqIncPeersCount) - let item = pool.incQueue.pop() - item.data - else: - inc(pool.acqOutPeersCount) - let item = pool.outQueue.pop() - item.data - else: - if len(pool.outQueue) > 0: - inc(pool.acqOutPeersCount) - let item = pool.outQueue.pop() - item.data - else: - inc(pool.acqIncPeersCount) - let item = pool.incQueue.pop() - item.data - else: - if PeerType.Outgoing in filter: - inc(pool.acqOutPeersCount) - let item = pool.outQueue.pop() - item.data - else: - inc(pool.acqIncPeersCount) - let item = pool.incQueue.pop() - item.data - var pitem = addr(pool.storage[pindex]) + var + rindex: PeerIndex = PeerIndex(-1) + res: ptr PeerItem[A] = nil + for sindex, pindex in pool.sorted.pairs(): + res = addr(pool.storage[int(pindex)]) + if (PeerFlags.Acquired notin res[].flags) and + (res[].peerType in filter) and + (isNil(customFilter) or customFilter(res[].data)): + rindex = PeerIndex(sindex) + break + (rindex, res) + + doAssert(not(isNil(pitem))) doAssert(PeerFlags.Acquired notin pitem[].flags) + doAssert(len(pool.sorted) == len(pool.unsorted)) + let uindex = pool.unsorted.find(pool.sorted[int(sindex)]) + doAssert(uindex >= 0, "unsorted and sorted arrays should be equal!") + + case pitem[].peerType + of PeerType.Incoming: + inc(pool.acqIncPeersCount) + of PeerType.Outgoing: + inc(pool.acqOutPeersCount) + + pool.unsorted.del(uindex) + pool.sorted = pool.resort(pool.unsorted) + pitem[].flags.incl(PeerFlags.Acquired) pitem[].data -proc acquire*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = +proc acquire*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): Future[A] {.async: (raises: [CancelledError]).} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. + ## This procedure will wait for peer which satisfy filter will become + ## available for acquisition. mixin getKey doAssert(filter != {}, "Filter must not be empty") while true: if pool.lenAvailable(filter) == 0: - await pool.waitNotEmptyEvent(filter) + await pool.changeEvent.wait() + pool.changeEvent.clear() else: - return pool.acquireItemImpl(filter) - -proc acquireNoWait*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing} - ): A {.raises: [PeerPoolError].} = + return pool.acquireItemImpl(filter, nil) + +proc acquire*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): Future[A] {.async: (raises: [CancelledError]).} = + ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` and + ## custom filter ``customFilter``. This procedure will wait for peer which + ## satisfy filters will become available for acquisition. + mixin getKey + doAssert(filter != {}, "Filter must not be empty") + while true: + if pool.lenAvailable(filter, customFilter) == 0: + await pool.changeEvent.wait() + pool.changeEvent.clear() + else: + return pool.acquireItemImpl(filter, customFilter) + +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A {.raises: [PeerPoolError].} = + ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` + ## without waiting, this procedure will raise PeerPoolError if no peers + ## which satisfy filters are available for acquisition. doAssert(filter != {}, "Filter must not be empty") if pool.lenAvailable(filter) < 1: raise newException(PeerPoolError, "Not enough peers in pool") - pool.acquireItemImpl(filter) + pool.acquireItemImpl(filter, nil) + +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): A {.raises: [PeerPoolError].} = + ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` and + ## custom filter ``customFilter`` without waiting, this procedure will raise + ## PeerPoolError if no peers which satisfy filters are available for + ## acquisition. + doAssert(filter != {}, "Filter must not be empty") + if pool.lenAvailable(filter, customFilter) < 1: + raise newException(PeerPoolError, "Not enough peers in pool") + pool.acquireItemImpl(filter, customFilter) proc release*[A, B](pool: PeerPool[A, B], peer: A) = ## Release peer ``peer`` back to PeerPool ``pool`` mixin getKey - let key = getKey(peer) - var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1)) - if titem.data >= 0: - let pindex = titem.data - var item = addr(pool.storage[pindex]) - if PeerFlags.Acquired in item[].flags: - if not(pool.checkPeerScore(peer)): - item[].flags.incl(DeleteOnRelease) - if PeerFlags.DeleteOnRelease in item[].flags: - # We do not care about result here because peer is present in registry - # and has all proper flags set. - discard pool.deletePeer(peer, force = true) - else: - item[].flags.excl(PeerFlags.Acquired) - case item[].peerType - of PeerType.Incoming: - pool.incQueue.push(titem) - dec(pool.acqIncPeersCount) - of PeerType.Outgoing: - pool.outQueue.push(titem) - dec(pool.acqOutPeersCount) - pool.fireNotEmptyEvent(item[]) + let + key = peer.getKey() + pindex = + block: + let res = pool.registry.getOrDefault(key, PeerIndex(-1)) + if res == PeerIndex(-1): + return + res + item = addr(pool.storage[int(pindex)]) + + if PeerFlags.Acquired in item[].flags: + if not(pool.checkPeerScore(peer)): + item[].flags.incl(DeleteOnRelease) + if PeerFlags.DeleteOnRelease in item[].flags: + case item[].peerType + of PeerType.Incoming: + dec(pool.acqIncPeersCount) + dec(pool.curIncPeersCount) + of PeerType.Outgoing: + dec(pool.acqOutPeersCount) + dec(pool.curOutPeersCount) + pool.deletePeerImpl(peer, key, pindex) + else: + item[].flags.excl(PeerFlags.Acquired) + case item[].peerType + of PeerType.Incoming: + dec(pool.acqIncPeersCount) + of PeerType.Outgoing: + dec(pool.acqOutPeersCount) + pool.unsorted.add(pindex) + pool.sorted = pool.resort(pool.unsorted) + pool.changeEvent.fire() proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = ## Release array of peers ``peers`` back to PeerPool ``pool``. for item in peers: pool.release(item) -proc acquire*[A, B](pool: PeerPool[A, B], - number: int, - filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = +proc acquire*[A, B]( + pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, PeerType.Outgoing} +): Future[seq[A]] {.async: (raises: [CancelledError]).} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") @@ -601,7 +602,8 @@ proc acquire*[A, B](pool: PeerPool[A, B], if len(peers) >= number: break if pool.lenAvailable(filter) == 0: - await pool.waitNotEmptyEvent(filter) + await pool.changeEvent.wait() + pool.changeEvent.clear() else: peers.add(pool.acquireItemImpl(filter)) except CancelledError as exc: @@ -611,96 +613,205 @@ proc acquire*[A, B](pool: PeerPool[A, B], pool.release(item) peers.setLen(0) raise exc - return peers + peers + +proc acquire*[A, B]( + pool: PeerPool[A, B], + number: int, + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): Future[seq[A]] {.async: (raises: [CancelledError]).} = + ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the + ## filter ``filter`` and custom filter ``customFilter``. This procedure will + ## wait for ``number`` of peers which satisfy filter will become available + ## and acquired. + doAssert(filter != {}, "Filter must not be empty") + var peers = newSeq[A]() + try: + if number > 0: + while true: + if len(peers) >= number: + break + if pool.lenAvailable(filter, customFilter) == 0: + await pool.changeEvent.wait() + pool.changeEvent.clear() + else: + peers.add(pool.acquireItemImpl(filter, customFilter)) + except CancelledError as exc: + # If we got cancelled, we need to return all the acquired peers back to + # pool. + for item in peers: + pool.release(item) + peers.setLen(0) + raise exc + peers -proc acquireNoWait*[A, B](pool: PeerPool[A, B], - number: int, - filter = {PeerType.Incoming, - PeerType.Outgoing}): seq[A] = +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, PeerType.Outgoing} +): seq[A] = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the - ## filter ``filter``. + ## filter ``filter``. This procedure does not wait for peers, it will raise + ## `PeerPoolError` if peers matching the filters are not available. doAssert(filter != {}, "Filter must not be empty") var peers = newSeq[A]() if pool.lenAvailable(filter) < number: raise newException(PeerPoolError, "Not enough peers in pool") for i in 0 ..< number: peers.add(pool.acquireItemImpl(filter)) - return peers + peers + +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + number: int, + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): seq[A] = + ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the + ## filter ``filter`` and custom filter ``filter``. This procedure does not + ## wait for peers, it will raise `PeerPoolError` if peers matching the + ## filters are not available. + doAssert(filter != {}, "Filter must not be empty") + var peers = newSeq[A]() + if pool.lenAvailable(filter, customFilter) < number: + raise newException(PeerPoolError, "Not enough peers in pool") + for i in 0 ..< number: + peers.add(pool.acquireItemImpl(filter, customFilter)) + peers -proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = +proc acquireIncomingPeer*[A, B]( + pool: PeerPool[A, B] +): Future[A] {.async: (raises: [CancelledError], raw: true).} = ## Acquire single incoming peer from PeerPool ``pool``. pool.acquire({PeerType.Incoming}) -proc acquireOutgoingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = +proc acquireOutgoingPeer*[A, B]( + pool: PeerPool[A, B] +): Future[A] {.async: (raises: [CancelledError], raw: true).} = ## Acquire single outgoing peer from PeerPool ``pool``. pool.acquire({PeerType.Outgoing}) -proc acquireIncomingPeers*[A, B](pool: PeerPool[A, B], - number: int): Future[seq[A]] {.inline.} = +proc acquireIncomingPeers*[A, B]( + pool: PeerPool[A, B], + number: int +): Future[seq[A]] {.async: (raises: [CancelledError], raw: true).} = ## Acquire ``number`` number of incoming peers from PeerPool ``pool``. pool.acquire(number, {PeerType.Incoming}) -proc acquireOutgoingPeers*[A, B](pool: PeerPool[A, B], - number: int): Future[seq[A]] {.inline.} = +proc acquireOutgoingPeers*[A, B]( + pool: PeerPool[A, B], + number: int +): Future[seq[A]] {.async: (raises: [CancelledError], raw: true).} = ## Acquire ``number`` number of outgoing peers from PeerPool ``pool``. pool.acquire(number, {PeerType.Outgoing}) -iterator peers*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): A = +iterator peers*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A = + ## Iterate over sorted list of peers. + ## + ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values + ## will be first. + var unsorted: seq[PeerIndex] + for pindex in pool.registry.values(): + if pool.storage[int(pindex)].peerType in filter: + unsorted.add(pindex) + + let sorted = pool.resort(unsorted).mapIt(pool.storage[int(it)].data) + + for item in sorted: + yield item + +iterator peers*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): A = ## Iterate over sorted list of peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - var sorted = initHeapQueue[PeerIndex]() - for peerIdx in pool.registry.values(): - if pool.storage[peerIdx.data].peerType in filter: - sorted.push(peerIdx) - while len(sorted) > 0: - let peerIdx = sorted.pop() - yield pool.storage[peerIdx.data].data - -iterator availablePeers*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): A = + var unsorted: seq[PeerIndex] + for pindex in pool.registry.values(): + let item = addr(pool.storage[int(pindex)]) + if (item[].peerType in filter) and + (isNil(customFilter) or customFilter(item[].data)): + unsorted.add(pindex) + + let sorted = pool.resort(unsorted).mapIt(pool.storage[int(it)].data) + + for item in sorted: + yield item + +iterator availablePeers*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A = + ## Iterate over sorted list of available peers. + ## + ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values + ## will be first. + for pindex in pool.sorted: + if (PeerFlags.Acquired notin pool.storage[int(pindex)].flags) and + (pool.storage[int(pindex)].peerType in filter): + yield pool.storage[int(pindex)].data + +iterator availablePeers*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): A = ## Iterate over sorted list of available peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - var sorted = initHeapQueue[PeerIndex]() - for peerIdx in pool.registry.values(): - if (PeerFlags.Acquired notin pool.storage[peerIdx.data].flags) and - (pool.storage[peerIdx.data].peerType in filter): - sorted.push(peerIdx) - while len(sorted) > 0: - let peerIdx = sorted.pop() - yield pool.storage[peerIdx.data].data - -iterator acquiredPeers*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): A = + for pindex in pool.sorted: + let item = addr(pool.storage[int(pindex)]) + if (PeerFlags.Acquired notin item[].flags) and + (item[].peerType in filter) and + (isNil(customFilter) or customFilter(item[].data)): + yield item[].data + +iterator acquiredPeers*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A = ## Iterate over sorted list of acquired (non-available) peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - var sorted = initHeapQueue[PeerIndex]() - for peerIdx in pool.registry.values(): - if (PeerFlags.Acquired in pool.storage[peerIdx.data].flags) and - (pool.storage[peerIdx.data].peerType in filter): - sorted.push(peerIdx) - while len(sorted) > 0: - let peerIdx = sorted.pop() - yield pool.storage[peerIdx.data].data - -proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline, raises: [KeyError].} = + mixin `cmp` + proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = + cmp(pool.storage[int(a)].data, pool.storage[int(b)].data) + + var unsorted: seq[PeerIndex] + for pindex in pool.registry.values(): + if (PeerFlags.Acquired in pool.storage[int(pindex)].flags) and + (pool.storage[int(pindex)].peerType in filter): + unsorted.add(pindex) + + let sorted = + unsorted.sorted(pcmp, order = SortOrder.Descending). + mapIt(pool.storage[int(it)].data) + + for item in sorted: + yield item + +proc `[]`*[A, B]( + pool: PeerPool[A, B], + key: B +): A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - let pindex = pool.registry[key] - pool.storage[pindex.data] + pool.storage[int(pool.registry[key])].data -proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline, raises: [KeyError].} = +proc `[]`*[A, B]( + pool: var PeerPool[A, B], + key: B +): var A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - let pindex = pool.registry[key] - pool.storage[pindex.data].data + pool.storage[int(pool.registry[key])].data proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} = ## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``. @@ -710,9 +821,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## not present, default initialization value for type ``A`` is returned ## (e.g. 0 for any integer type). - let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) - if pindex.data >= 0: - pool.storage[pindex.data].data + let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) + if pindex != PeerIndex(-1): + pool.storage[int(pindex)].data else: A() @@ -720,17 +831,17 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B, default: A): A {.inline.} = ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## not present, default value ``default`` is returned. - let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) - if pindex.data >= 0: - pool.storage[pindex.data].data + let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) + if pindex != PeerIndex(-1): + pool.storage[int(pindex)].data else: default proc clear*[A, B](pool: PeerPool[A, B]) = ## Performs PeerPool's ``pool`` storage and counters reset. - pool.incQueue.clear() - pool.outQueue.clear() pool.registry.clear() + pool.sorted.reset() + pool.unsorted.reset() for i in 0 ..< len(pool.storage): pool.storage[i] = PeerItem[A]() pool.storage.setLen(0) @@ -739,10 +850,12 @@ proc clear*[A, B](pool: PeerPool[A, B]) = pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = +proc clearSafe*[A, B]( + pool: PeerPool[A, B] +) {.async: (raises: [CancelledError]).} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. - var acquired = newSeq[A]() + var acquired: seq[A] while len(pool.registry) > len(acquired): var peers = await pool.acquire(len(pool.registry) - len(acquired)) for item in peers: diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index bf648e07b1..6a50e5f965 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2019-2024 Status Research & Development GmbH +# Copyright (c) 2019-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -8,8 +8,8 @@ {.push raises: [].} {.used.} -import std/[random, heapqueue, tables] -import chronos +import std/[random, heapqueue, tables, sequtils, strutils] +import chronos, chronos/unittest2/asynctests import ../beacon_chain/networking/peer_pool import ./testutil @@ -21,6 +21,7 @@ type PeerTest = object id: PeerTestID weight: int + metadata: uint64 future: Future[void] func getKey(peer: PeerTest): PeerTestID = @@ -29,6 +30,9 @@ func getKey(peer: PeerTest): PeerTestID = func getFuture(peer: PeerTest): Future[void] = peer.future +func getMetadata(peer: PeerTest): uint64 = + peer.metadata + func `<`(a, b: PeerTest): bool = `<`(a.weight, b.weight) @@ -36,6 +40,14 @@ proc init*(t: typedesc[PeerTest], id: string = "", weight: int = 0): PeerTest = PeerTest(id: id, weight: weight, future: newFuture[void]()) +proc init*(t: typedesc[PeerTest], id: string = "", + weight: int = 0, metadata: uint64): PeerTest = + PeerTest(id: id, weight: weight, future: newFuture[void](), + metadata: metadata) + +proc toString(a: openArray[PeerTest]): string = + "[" & a.mapIt(it.getKey()).join(",") & "]" + proc close(peer: PeerTest) = peer.future.complete() @@ -591,6 +603,285 @@ suite "PeerPool testing suite": len(acqui2) == 2 len(acqui3) == 1 + asyncTest "Custom filters test": + var pool = newPeerPool[PeerTest, PeerTestID]() + let + peer1 = PeerTest.init("peer1", 10, 256'u64) + peer2 = PeerTest.init("peer2", 9, 0'u64) + peer3 = PeerTest.init("peer3", 8, 4'u64) + peer4 = PeerTest.init("peer4", 7, 2'u64) + peer5 = PeerTest.init("peer5", 6, 2'u64) + peer6 = PeerTest.init("peer6", 5, 2'u64) + peer7 = PeerTest.init("peer7", 4, 4'u64) + peer8 = PeerTest.init("peer8", 3, 128'u64) + peer9 = PeerTest.init("peer9", 2, 4'u64) + peer10 = PeerTest.init("peer10", 1, 256'u64) + + proc custom1(peer: PeerTest): bool = + true + + proc custom2(peer: PeerTest): bool = + if peer.getMetadata() == 2'u64: + true + else: + false + + proc custom3(peer: PeerTest): bool = + if peer.getMetadata() in [2'u64, 4'u64]: + true + else: + false + + check: + pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer3, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer4, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer5, PeerType.Incoming) == PeerStatus.Success + + pool.addPeerNoWait(peer10, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer7, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer6, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer8, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer9, PeerType.Outgoing) == PeerStatus.Success + + template checkTotal() = + let + total1 = + pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom1).toSeq() + total2 = + pool.peers({PeerType.Incoming}, custom1).toSeq() + total3 = + pool.peers({PeerType.Outgoing}, custom1).toSeq() + total4 = + pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom2).toSeq() + total5 = + pool.peers({PeerType.Incoming}, custom2).toSeq() + total6 = + pool.peers({PeerType.Outgoing}, custom2).toSeq() + total7 = + pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom3).toSeq() + total8 = + pool.peers({PeerType.Incoming}, custom3).toSeq() + total9 = + pool.peers({PeerType.Outgoing}, custom3).toSeq() + + check: + total1.toString() == + "[peer1,peer2,peer3,peer4,peer5,peer6,peer7,peer8,peer9,peer10]" + total2.toString() == "[peer1,peer2,peer3,peer4,peer5]" + total3.toString() == "[peer6,peer7,peer8,peer9,peer10]" + total4.toString() == "[peer4,peer5,peer6]" + total5.toString() == "[peer4,peer5]" + total6.toString() == "[peer6]" + total7.toString() == "[peer3,peer4,peer5,peer6,peer7,peer9]" + total8.toString() == "[peer3,peer4,peer5]" + total9.toString() == "[peer6,peer7,peer9]" + + checkTotal() + + block: + let + avail1 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom1).toSeq() + avail2 = + pool.availablePeers({PeerType.Incoming}, custom1).toSeq() + avail3 = + pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() + avail4 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom2).toSeq() + avail5 = + pool.availablePeers({PeerType.Incoming}, custom2).toSeq() + avail6 = + pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() + avail7 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom3).toSeq() + avail8 = + pool.availablePeers({PeerType.Incoming}, custom3).toSeq() + avail9 = + pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() + + check: + avail1.toString() == + "[peer1,peer2,peer3,peer4,peer5,peer6,peer7,peer8,peer9,peer10]" + avail2.toString() == "[peer1,peer2,peer3,peer4,peer5]" + avail3.toString() == "[peer6,peer7,peer8,peer9,peer10]" + avail4.toString() == "[peer4,peer5,peer6]" + avail5.toString() == "[peer4,peer5]" + avail6.toString() == "[peer6]" + avail7.toString() == "[peer3,peer4,peer5,peer6,peer7,peer9]" + avail8.toString() == "[peer3,peer4,peer5]" + avail9.toString() == "[peer6,peer7,peer9]" + + let + tpeer1 = await pool.acquire({PeerType.Incoming, PeerType.Outgoing}, + custom1) + tpeer2 = await pool.acquire({PeerType.Incoming}, custom2) + tpeer3 = await pool.acquire({PeerType.Outgoing}, custom2) + tpeer4 = await pool.acquire({PeerType.Incoming}, custom3) + tpeer5 = await pool.acquire({PeerType.Outgoing}, custom3) + + check: + tpeer1.getKey() == "peer1" + tpeer2.getKey() == "peer4" + tpeer3.getKey() == "peer6" + tpeer4.getKey() == "peer3" + tpeer5.getKey() == "peer7" + + checkTotal() + + block: + let + avail1 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom1).toSeq() + avail2 = + pool.availablePeers({PeerType.Incoming}, custom1).toSeq() + avail3 = + pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() + avail4 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom2).toSeq() + avail5 = + pool.availablePeers({PeerType.Incoming}, custom2).toSeq() + avail6 = + pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() + avail7 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom3).toSeq() + avail8 = + pool.availablePeers({PeerType.Incoming}, custom3).toSeq() + avail9 = + pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() + + check: + avail1.toString() == "[peer2,peer5,peer8,peer9,peer10]" + avail2.toString() == "[peer2,peer5]" + avail3.toString() == "[peer8,peer9,peer10]" + + avail4.toString() == "[peer5]" + avail5.toString() == "[peer5]" + avail6.toString() == "[]" + + avail7.toString() == "[peer5,peer9]" + avail8.toString() == "[peer5]" + avail9.toString() == "[peer9]" + + let + tpeer6 = await pool.acquire({PeerType.Incoming, PeerType.Outgoing}, + custom1) + tpeer7 = await pool.acquire({PeerType.Incoming}, custom2) + tpeer8 = await pool.acquire({PeerType.Outgoing}, custom3) + tpeer9 = await pool.acquire({PeerType.Outgoing}, custom1) + tpeer10 = await pool.acquire({PeerType.Outgoing}, custom1) + + check: + tpeer6.getKey() == "peer2" + tpeer7.getKey() == "peer5" + tpeer8.getKey() == "peer9" + tpeer9.getKey() == "peer8" + tpeer10.getKey() == "peer10" + + checkTotal() + + block: + let + avail1 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom1).toSeq() + avail2 = + pool.availablePeers({PeerType.Incoming}, custom1).toSeq() + avail3 = + pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() + avail4 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom2).toSeq() + avail5 = + pool.availablePeers({PeerType.Incoming}, custom2).toSeq() + avail6 = + pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() + avail7 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom3).toSeq() + avail8 = + pool.availablePeers({PeerType.Incoming}, custom3).toSeq() + avail9 = + pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() + + check: + avail1.toString() == "[]" + avail2.toString() == "[]" + avail3.toString() == "[]" + + avail4.toString() == "[]" + avail5.toString() == "[]" + avail6.toString() == "[]" + + avail7.toString() == "[]" + avail8.toString() == "[]" + avail9.toString() == "[]" + + let + fut1 = pool.acquire({PeerType.Incoming}, custom2) + fut2 = pool.acquire({PeerType.Incoming}, custom2) + fut3 = pool.acquire({PeerType.Outgoing}, custom3) + fut4 = pool.acquire({PeerType.Outgoing}, custom3) + + check: + fut1.finished == false + fut2.finished == false + fut3.finished == false + fut4.finished == false + + pool.release(tpeer1) + await sleepAsync(100.milliseconds) + check: + fut1.finished == false + fut2.finished == false + fut3.finished == false + fut4.finished == false + + pool.release(tpeer2) + await sleepAsync(100.milliseconds) + check: + fut1.finished == true + fut1.value.getKey() == "peer4" + fut2.finished == false + fut3.finished == false + fut4.finished == false + + pool.release(tpeer3) + await sleepAsync(100.milliseconds) + check: + fut2.finished == false + fut3.finished == true + fut3.value.getKey() == "peer6" + fut4.finished == false + + pool.release(tpeer5) + await sleepAsync(100.milliseconds) + check: + fut2.finished == false + fut4.finished == true + fut4.value.getKey() == "peer7" + + pool.release(tpeer4) + pool.release(tpeer6) + pool.release(tpeer8) + pool.release(tpeer10) + await sleepAsync(100.milliseconds) + check: + fut2.finished == false + + pool.release(tpeer7) + await sleepAsync(100.milliseconds) + check: + fut2.finished == true + fut2.value.getKey() == "peer5" + test "Score check test": var pool = newPeerPool[PeerTest, PeerTestID]() func scoreCheck(peer: PeerTest): bool = @@ -889,8 +1180,8 @@ suite "PeerPool testing suite": pool7.lenSpace({PeerType.Incoming}) == 0 pool7.lenSpace({PeerType.Outgoing}) == high(int) - 39 - # We could not check whole high(int), so we check 10_000 items - for i in 0 ..< 10_000: + # We could not check whole high(int), so we check 1000 items + for i in 0 ..< 1000: check: pool7.addPeerNoWait(PeerTest.init("idOut" & $i), PeerType.Outgoing) == PeerStatus.Success @@ -914,8 +1205,8 @@ suite "PeerPool testing suite": pool8.lenSpace({PeerType.Outgoing}) == 0 pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - # We could not check whole high(int), so we check 10_000 items - for i in 0 ..< 10_000: + # We could not check whole high(int), so we check 1000 items + for i in 0 ..< 1000: check: pool8.addPeerNoWait(PeerTest.init("idInc" & $i), PeerType.Incoming) == PeerStatus.Success @@ -924,8 +1215,8 @@ suite "PeerPool testing suite": pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - (i + 1) # POOL 9 - # We could not check whole high(int), so we check 10_000 items - for i in 0 ..< 10_000: + # We could not check whole high(int), so we check 1000 items + for i in 0 ..< 1000: check: pool9.addPeerNoWait(PeerTest.init("idInc" & $i), PeerType.Incoming) == PeerStatus.Success From 0f9e5b6710dff7c2a9d57b374318cd79d8ed1b4b Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 19 Mar 2025 20:51:24 +0200 Subject: [PATCH 02/11] Update AllTests. --- AllTests-mainnet.md | 1 + 1 file changed, 1 insertion(+) diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 701d38958a..5ea840b375 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -792,6 +792,7 @@ AllTests-mainnet + Access peers by key test OK + Acquire from empty pool OK + Acquire/Sorting and consistency test OK ++ Custom filters test OK + Delete peer on release text OK + Iterators test OK + Peer lifetime test OK From 083ade5bbe4f04834aa6481fbd60a79159f72531 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 19 Mar 2025 22:02:53 +0200 Subject: [PATCH 03/11] Eliminate unneeded constructions. --- tests/test_peer_pool.nim | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 6a50e5f965..fddb590152 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -13,9 +13,6 @@ import chronos, chronos/unittest2/asynctests import ../beacon_chain/networking/peer_pool import ./testutil -template closureScope(raisesAnnotation: untyped, body: untyped): untyped = - (proc() {.raises: raisesAnnotation} = body)() - type PeerTestID = string PeerTest = object @@ -253,7 +250,7 @@ suite "PeerPool testing suite": itemFut23.finished == false itemFut24.finished == false - test "Acquire/Sorting and consistency test": closureScope([CatchableError]): + test "Acquire/Sorting and consistency test": const TestsCount = 1000 MaxNumber = 1_000_000 @@ -427,7 +424,7 @@ suite "PeerPool testing suite": check waitFor(testPeerLifetime()) == true - test "Safe/Clear test": closureScope([CatchableError]): + test "Safe/Clear test": var pool = newPeerPool[PeerTest, PeerTestID]() var peer1 = PeerTest.init("peer1", 10) var peer2 = PeerTest.init("peer2", 9) @@ -474,7 +471,7 @@ suite "PeerPool testing suite": asyncSpawn testConsumer() check waitFor(testClose()) == true - test "Access peers by key test": closureScope([CatchableError]): + test "Access peers by key test": var pool = newPeerPool[PeerTest, PeerTestID]() var peer1 = PeerTest.init("peer1", 10) var peer2 = PeerTest.init("peer2", 9) From a762f2d36192d7dc4de86a4bcfce301381d79920 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 25 Mar 2025 07:12:26 +0200 Subject: [PATCH 04/11] Address review comments. Fix clear() does not properly cleanup empties[] array. --- beacon_chain/networking/eth2_network.nim | 2 +- beacon_chain/networking/peer_pool.nim | 149 +++++++++---------- tests/test_peer_pool.nim | 181 ++++++++++++++++------- 3 files changed, 204 insertions(+), 128 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 4e1a06c714..66b03dd651 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -481,7 +481,7 @@ func netKbps*(peer: Peer): float {.inline.} = round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) # /!\ Must be exported to be seen by `peerpool`. -proc cmp*(a, b: Peer): int = +func cmp*(a, b: Peer): int = if a.score == b.score: cmp(a.netThroughput.average, b.netThroughput.average) else: diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index f03978f5a5..5daf0fb14f 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[tables, heapqueue, algorithm, sequtils] +import std/[tables, heapqueue, algorithm] import chronos export tables @@ -26,7 +26,7 @@ type LowScoreError, ## Peer has too low score. DeadPeerError ## Peer is already dead. - PeerIndex = distinct int + PeerIndex = int PeerItem[T] = object data: T @@ -47,7 +47,6 @@ type storage: seq[PeerItem[A]] registry: Table[B, PeerIndex] sorted: seq[PeerIndex] - unsorted: seq[PeerIndex] empties: seq[PeerIndex] scoreCheck: PeerScoreCheckCallback[A] onDeletePeer: PeerOnDeleteCallback[A] @@ -62,24 +61,34 @@ type PeerPoolError* = object of CatchableError -proc `==`*(a, b: PeerIndex): bool {.borrow.} - iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) = for peerId, pindex in pool.registry: - yield (peerId, pool.storage[int(pindex)].data) + yield (peerId, pool.storage[pindex].data) -proc resort[A, B](pool: PeerPool[A, B], - unsorted: openArray[PeerIndex]): seq[PeerIndex] = +proc resort[A, B]( + pool: PeerPool[A, B], + unsorted: openArray[PeerIndex] +): seq[PeerIndex] = mixin `cmp` proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = - cmp(pool.storage[int(a)].data, pool.storage[int(b)].data) + cmp(pool.storage[a].data, pool.storage[b].data) unsorted.sorted(pcmp, order = SortOrder.Descending) +proc bsearch[A, B]( + pool: PeerPool[A, B], + sorted: openArray[PeerIndex], + index: PeerIndex +): int = + mixin `cmp` + proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = + cmp(pool.storage[a].data, pool.storage[b].data) + sorted.binarySearch(index, pcmp) + proc addToStorage[A, B](pool: PeerPool[A, B], item: PeerItem[A]): PeerIndex = var indexedItem = item if len(pool.empties) > 0: indexedItem.index = pool.empties[0] - pool.storage[int(indexedItem.index)] = indexedItem + pool.storage[indexedItem.index] = indexedItem pool.empties.del(0) else: indexedItem.index = PeerIndex(len(pool.storage)) @@ -140,9 +149,6 @@ proc newPeerPool*[A, B]( scoreCheck: scoreCheckCb, peerCounter: peerCounterCb, onDeletePeer: onDeleteCb, - storage: newSeq[PeerItem[A]](), - unsorted: newSeq[PeerIndex](), - sorted: newSeq[PeerIndex](), maxPeersCount: maxPeersCount, maxIncPeersCount: maxIncPeersCount, maxOutPeersCount: maxOutPeersCount, @@ -160,7 +166,7 @@ proc len*[A, B](pool: PeerPool[A, B]): int = proc lenCurrent*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = + PeerType.Outgoing}): int = ## Returns number of registered peers in PeerPool ``pool`` which satisfies ## filter ``filter``. (if PeerType.Incoming in filter: pool.curIncPeersCount else: 0) + @@ -169,7 +175,7 @@ proc lenCurrent*[A, B](pool: PeerPool[A, B], proc lenAvailable*[A, B]( pool: PeerPool[A, B], filter = {PeerType.Incoming, PeerType.Outgoing} -): int {.inline.} = +): int = ## Returns number of peers available for acquisition in PeerPool ## ``pool`` which satisfies filter ``filter``. (if PeerType.Incoming in filter: @@ -192,8 +198,8 @@ proc lenAvailable*[A, B]( ## Note: This is O(n) operation. let available = pool.lenAvailable(filter) var res = 0 - for sindex, pindex in pool.sorted.pairs(): - let item = addr(pool.storage[int(pindex)]) + for pindex in pool.sorted.items(): + let item = addr(pool.storage[pindex]) if (PeerFlags.Acquired notin item[].flags) and (item[].peerType in filter) and (isNil(customFilter) or customFilter(item[].data)): @@ -207,7 +213,7 @@ proc lenAvailable*[A, B]( proc lenAcquired*[A, B]( pool: PeerPool[A, B], filter = {PeerType.Incoming, PeerType.Outgoing} -): int {.inline.} = +): int = ## Returns number of acquired peers in PeerPool ``pool`` which satisifies ## filter ``filter``. (if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) + @@ -216,7 +222,7 @@ proc lenAcquired*[A, B]( proc lenSpace*[A, B]( pool: PeerPool[A, B], filter = {PeerType.Incoming, PeerType.Outgoing} -): int {.inline.} = +): int = ## Returns number of available space for peers in PeerPool ``pool`` which ## satisfies filter ``filter``. let @@ -274,14 +280,14 @@ proc deletePeerImpl[A, B]( key: B, pindex: PeerIndex ) = - pool.storage[int(pindex)] = PeerItem[A](index: PeerIndex(-1)) + pool.storage[pindex] = PeerItem[A](index: PeerIndex(-1)) pool.empties.add(pindex) pool.registry.del(key) - let uindex = pool.unsorted.find(pindex) - # `unsorted` keeps only peers which are available for acquisition, - if uindex >= 0: - pool.unsorted.del(uindex) - pool.sorted = pool.resort(pool.unsorted) + # let sindex = pool.sorted.find(pindex) + let sindex = pool.bsearch(pool.sorted, pindex) + if sindex >= 0: + pool.sorted.del(sindex) + pool.sorted = pool.resort(pool.sorted) # Indicate that we have an empty space pool.changeEvent.fire() pool.peerDeleted(peer) @@ -303,7 +309,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = return false res - var item = addr(pool.storage[int(pindex)]) + var item = addr(pool.storage[pindex]) if (PeerFlags.Acquired in item[].flags): if not(force): item[].flags.incl(PeerFlags.DeleteOnRelease) @@ -335,11 +341,12 @@ proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, let item = PeerItem[A](data: peer, peerType: peerType) pindex = pool.addToStorage(item) - pitem = addr(pool.storage[int(pindex)]) + pitem = addr(pool.storage[pindex]) pool.registry[peerKey] = pindex - pool.unsorted.add(pindex) - pool.sorted = pool.resort(pool.unsorted) + pool.sorted.add(pindex) + pool.sorted = pool.resort(pool.sorted) + pitem[].data.getFuture().addCallback(onPeerClosed) case peerType of PeerType.Incoming: @@ -454,14 +461,14 @@ proc acquireItemImpl[A, B]( pool: PeerPool[A, B], filter: set[PeerType], customFilter: PeerCustomFilterCallback[A] = nil -): A {.inline.} = +): A = let (sindex, pitem) = block: var rindex: PeerIndex = PeerIndex(-1) res: ptr PeerItem[A] = nil for sindex, pindex in pool.sorted.pairs(): - res = addr(pool.storage[int(pindex)]) + res = addr(pool.storage[pindex]) if (PeerFlags.Acquired notin res[].flags) and (res[].peerType in filter) and (isNil(customFilter) or customFilter(res[].data)): @@ -471,9 +478,6 @@ proc acquireItemImpl[A, B]( doAssert(not(isNil(pitem))) doAssert(PeerFlags.Acquired notin pitem[].flags) - doAssert(len(pool.sorted) == len(pool.unsorted)) - let uindex = pool.unsorted.find(pool.sorted[int(sindex)]) - doAssert(uindex >= 0, "unsorted and sorted arrays should be equal!") case pitem[].peerType of PeerType.Incoming: @@ -481,8 +485,8 @@ proc acquireItemImpl[A, B]( of PeerType.Outgoing: inc(pool.acqOutPeersCount) - pool.unsorted.del(uindex) - pool.sorted = pool.resort(pool.unsorted) + pool.sorted.del(sindex) + pool.sorted = pool.resort(pool.sorted) pitem[].flags.incl(PeerFlags.Acquired) pitem[].data @@ -557,7 +561,7 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = if res == PeerIndex(-1): return res - item = addr(pool.storage[int(pindex)]) + item = addr(pool.storage[pindex]) if PeerFlags.Acquired in item[].flags: if not(pool.checkPeerScore(peer)): @@ -578,11 +582,12 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = dec(pool.acqIncPeersCount) of PeerType.Outgoing: dec(pool.acqOutPeersCount) - pool.unsorted.add(pindex) - pool.sorted = pool.resort(pool.unsorted) + + pool.sorted.add(pindex) + pool.sorted = pool.resort(pool.sorted) pool.changeEvent.fire() -proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = +proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) = ## Release array of peers ``peers`` back to PeerPool ``pool``. for item in peers: pool.release(item) @@ -595,7 +600,7 @@ proc acquire*[A, B]( ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") - var peers = newSeq[A]() + var peers: seq[A] try: if number > 0: while true: @@ -626,7 +631,7 @@ proc acquire*[A, B]( ## wait for ``number`` of peers which satisfy filter will become available ## and acquired. doAssert(filter != {}, "Filter must not be empty") - var peers = newSeq[A]() + var peers: seq[A] try: if number > 0: while true: @@ -655,7 +660,7 @@ proc acquireNoWait*[A, B]( ## filter ``filter``. This procedure does not wait for peers, it will raise ## `PeerPoolError` if peers matching the filters are not available. doAssert(filter != {}, "Filter must not be empty") - var peers = newSeq[A]() + var peers: seq[A] if pool.lenAvailable(filter) < number: raise newException(PeerPoolError, "Not enough peers in pool") for i in 0 ..< number: @@ -673,7 +678,7 @@ proc acquireNoWait*[A, B]( ## wait for peers, it will raise `PeerPoolError` if peers matching the ## filters are not available. doAssert(filter != {}, "Filter must not be empty") - var peers = newSeq[A]() + var peers: seq[A] if pool.lenAvailable(filter, customFilter) < number: raise newException(PeerPoolError, "Not enough peers in pool") for i in 0 ..< number: @@ -716,13 +721,12 @@ iterator peers*[A, B]( ## will be first. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): - if pool.storage[int(pindex)].peerType in filter: + if pool.storage[pindex].peerType in filter: unsorted.add(pindex) - let sorted = pool.resort(unsorted).mapIt(pool.storage[int(it)].data) - - for item in sorted: - yield item + let sorted = pool.resort(unsorted) + for sindex in sorted: + yield pool.storage[sindex].data iterator peers*[A, B]( pool: PeerPool[A, B], @@ -735,15 +739,14 @@ iterator peers*[A, B]( ## will be first. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): - let item = addr(pool.storage[int(pindex)]) + let item = addr(pool.storage[pindex]) if (item[].peerType in filter) and (isNil(customFilter) or customFilter(item[].data)): unsorted.add(pindex) - let sorted = pool.resort(unsorted).mapIt(pool.storage[int(it)].data) - - for item in sorted: - yield item + let sorted = pool.resort(unsorted) + for sindex in sorted: + yield pool.storage[sindex].data iterator availablePeers*[A, B]( pool: PeerPool[A, B], @@ -754,9 +757,9 @@ iterator availablePeers*[A, B]( ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. for pindex in pool.sorted: - if (PeerFlags.Acquired notin pool.storage[int(pindex)].flags) and - (pool.storage[int(pindex)].peerType in filter): - yield pool.storage[int(pindex)].data + if (PeerFlags.Acquired notin pool.storage[pindex].flags) and + (pool.storage[pindex].peerType in filter): + yield pool.storage[pindex].data iterator availablePeers*[A, B]( pool: PeerPool[A, B], @@ -768,7 +771,7 @@ iterator availablePeers*[A, B]( ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. for pindex in pool.sorted: - let item = addr(pool.storage[int(pindex)]) + let item = addr(pool.storage[pindex]) if (PeerFlags.Acquired notin item[].flags) and (item[].peerType in filter) and (isNil(customFilter) or customFilter(item[].data)): @@ -782,36 +785,29 @@ iterator acquiredPeers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - mixin `cmp` - proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = - cmp(pool.storage[int(a)].data, pool.storage[int(b)].data) - var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): - if (PeerFlags.Acquired in pool.storage[int(pindex)].flags) and - (pool.storage[int(pindex)].peerType in filter): + if (PeerFlags.Acquired in pool.storage[pindex].flags) and + (pool.storage[pindex].peerType in filter): unsorted.add(pindex) - let sorted = - unsorted.sorted(pcmp, order = SortOrder.Descending). - mapIt(pool.storage[int(it)].data) - - for item in sorted: - yield item + let sorted = pool.resort(unsorted) + for sindex in sorted: + yield pool.storage[sindex].data proc `[]`*[A, B]( pool: PeerPool[A, B], key: B ): A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - pool.storage[int(pool.registry[key])].data + pool.storage[pool.registry[key]].data proc `[]`*[A, B]( pool: var PeerPool[A, B], key: B ): var A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - pool.storage[int(pool.registry[key])].data + pool.storage[pool.registry[key]].data proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} = ## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``. @@ -823,7 +819,7 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = ## (e.g. 0 for any integer type). let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) if pindex != PeerIndex(-1): - pool.storage[int(pindex)].data + pool.storage[pindex].data else: A() @@ -833,18 +829,19 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B, ## not present, default value ``default`` is returned. let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) if pindex != PeerIndex(-1): - pool.storage[int(pindex)].data + pool.storage[pindex].data else: default proc clear*[A, B](pool: PeerPool[A, B]) = ## Performs PeerPool's ``pool`` storage and counters reset. pool.registry.clear() + pool.sorted.reset() - pool.unsorted.reset() for i in 0 ..< len(pool.storage): pool.storage[i] = PeerItem[A]() - pool.storage.setLen(0) + pool.empties.reset() + pool.storage.reset() pool.curIncPeersCount = 0 pool.curOutPeersCount = 0 pool.acqIncPeersCount = 0 diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index fddb590152..16579b657e 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -319,61 +319,140 @@ suite "PeerPool testing suite": check waitFor(testAcquireRelease()) == TestsCount - test "deletePeer() test": - proc testDeletePeer(): Future[bool] {.async.} = - var pool = newPeerPool[PeerTest, PeerTestID]() - var peer = PeerTest.init("deletePeer") - - ## Delete available peer - doAssert(pool.addPeerNoWait(peer, - PeerType.Incoming) == PeerStatus.Success) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 1) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) - doAssert(pool.deletePeer(peer) == true) - doAssert(pool.len == 0) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + asyncTest "deletePeer() test": + var pool = newPeerPool[PeerTest, PeerTestID]() - ## Delete acquired peer - peer = PeerTest.init("closingPeer") - doAssert(pool.addPeerNoWait(peer, - PeerType.Incoming) == PeerStatus.Success) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 1) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) - var apeer = await pool.acquire() - doAssert(pool.deletePeer(peer) == true) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + ## Delete available peer + block: + let peer = PeerTest.init("deletePeer") + check: + pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + pool.len == 1 + pool.lenAvailable == 1 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 1 + pool.deletePeer(peer) == true + pool.len == 0 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 + + ## Delete acquired peer + block: + let peer = PeerTest.init("closingPeer") + check: + pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + pool.len == 1 + pool.lenAvailable == 1 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 1 + let apeer = await pool.acquire() + check: + pool.deletePeer(peer) == true + pool.len == 1 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 pool.release(apeer) - doAssert(pool.len == 0) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + check: + pool.len == 0 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 - ## Force delete acquired peer - peer = PeerTest.init("closingPeer") - doAssert(pool.addPeerNoWait(peer, - PeerType.Incoming) == PeerStatus.Success) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 1) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) - apeer = await pool.acquire() - doAssert(pool.deletePeer(peer, true) == true) - doAssert(pool.len == 0) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + ## Force delete acquired peer + block: + let peer = PeerTest.init("closingPeer") + check: + pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + pool.len == 1 + pool.lenAvailable == 1 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 1 + let apeer = await pool.acquire() + check: + pool.deletePeer(apeer, true) == true + pool.len == 0 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 - result = true - check waitFor(testDeletePeer()) == true + ## Delete single available peer in pool full of peers + block: + for i in 0 ..< 100: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + for i in 100 ..< 200: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success + check: + pool.len == 200 + pool.lenAvailable == 200 + pool.lenAvailable({PeerType.Outgoing}) == 100 + pool.lenAvailable({PeerType.Incoming}) == 100 + for i in 0 ..< 20: + let + index = 90 + i + peerKey = "peer" & $index + dpeer = pool.getOrDefault(peerKey, default(PeerTest)) + check: + pool.deletePeer(dpeer) == true + pool.hasPeer(peerKey) == false + check: + pool.len == 180 + pool.lenAvailable == 180 + pool.lenAvailable({PeerType.Outgoing}) == 90 + pool.lenAvailable({PeerType.Incoming}) == 90 + pool.clear() + + ## Delete single acquired peer in pool full of peers + block: + for i in 0 ..< 100: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + for i in 100 ..< 200: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success + check: + pool.len == 200 + pool.lenAvailable == 200 + pool.lenAvailable({PeerType.Outgoing}) == 100 + pool.lenAvailable({PeerType.Incoming}) == 100 + + for i in 0 ..< 20: + let apeer = await pool.acquire() + check pool.deletePeer(apeer) == true + pool.release(apeer) + check pool.hasPeer(apeer.getKey()) == false + + check: + pool.len == 180 + pool.lenAvailable == 180 + pool.clear() + + ## Force delete single acquired peer in pool full of peers + block: + for i in 0 ..< 100: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + for i in 100 ..< 200: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success + check: + pool.len == 200 + pool.lenAvailable == 200 + pool.lenAvailable({PeerType.Outgoing}) == 100 + pool.lenAvailable({PeerType.Incoming}) == 100 + + for i in 0 ..< 20: + let apeer = await pool.acquire() + check: + pool.deletePeer(apeer, true) == true + pool.hasPeer(apeer.getKey()) == false + + check: + pool.len == 180 + pool.lenAvailable == 180 test "Peer lifetime test": proc testPeerLifetime(): Future[bool] {.async.} = From c79ffd4208285065bab38927d4cee0b3d67888f0 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 25 Mar 2025 16:40:21 +0200 Subject: [PATCH 05/11] Attempt to fix crash but still use binary search instead of linear. --- beacon_chain/networking/peer_pool.nim | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 5daf0fb14f..1a96af0581 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -280,14 +280,15 @@ proc deletePeerImpl[A, B]( key: B, pindex: PeerIndex ) = + let sindex = pool.bsearch(pool.sorted, pindex) pool.storage[pindex] = PeerItem[A](index: PeerIndex(-1)) pool.empties.add(pindex) pool.registry.del(key) - # let sindex = pool.sorted.find(pindex) - let sindex = pool.bsearch(pool.sorted, pindex) if sindex >= 0: - pool.sorted.del(sindex) + # sindex == 0 when deleting peer which was acquired (not in `sorted` array). pool.sorted = pool.resort(pool.sorted) + pool.sorted.del(sindex) + # Indicate that we have an empty space pool.changeEvent.fire() pool.peerDeleted(peer) From bd34c773a804db246716259f1d81710cbe689390 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 26 Mar 2025 12:51:42 +0200 Subject: [PATCH 06/11] Make iterators safe in async environment. --- beacon_chain/networking/peer_pool.nim | 59 ++++++++++++++++++--------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 1a96af0581..0ebc93cc91 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[tables, heapqueue, algorithm] +import std/[tables, heapqueue, algorithm, sequtils] import chronos export tables @@ -725,9 +725,11 @@ iterator peers*[A, B]( if pool.storage[pindex].peerType in filter: unsorted.add(pindex) - let sorted = pool.resort(unsorted) - for sindex in sorted: - yield pool.storage[sindex].data + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + for peer in sortedPeers: + yield peer iterator peers*[A, B]( pool: PeerPool[A, B], @@ -745,9 +747,11 @@ iterator peers*[A, B]( (isNil(customFilter) or customFilter(item[].data)): unsorted.add(pindex) - let sorted = pool.resort(unsorted) - for sindex in sorted: - yield pool.storage[sindex].data + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + for peer in sortedPeers: + yield peer iterator availablePeers*[A, B]( pool: PeerPool[A, B], @@ -757,10 +761,17 @@ iterator availablePeers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - for pindex in pool.sorted: - if (PeerFlags.Acquired notin pool.storage[pindex].flags) and - (pool.storage[pindex].peerType in filter): - yield pool.storage[pindex].data + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = + pool.sorted.filterIt( + (PeerFlags.Acquired notin pool.storage[it].flags) and + (pool.storage[it].peerType in filter)). + mapIt(pool.storage[it].data) + + for peer in sortedPeers: + yield peer iterator availablePeers*[A, B]( pool: PeerPool[A, B], @@ -771,12 +782,18 @@ iterator availablePeers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - for pindex in pool.sorted: - let item = addr(pool.storage[pindex]) - if (PeerFlags.Acquired notin item[].flags) and - (item[].peerType in filter) and - (isNil(customFilter) or customFilter(item[].data)): - yield item[].data + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = + pool.sorted.filterIt( + (PeerFlags.Acquired notin pool.storage[it].flags) and + (pool.storage[it].peerType in filter) and + (isNil(customFilter) or customFilter(pool.storage[it].data))). + mapIt(pool.storage[it].data) + + for peer in sortedPeers: + yield peer iterator acquiredPeers*[A, B]( pool: PeerPool[A, B], @@ -792,9 +809,11 @@ iterator acquiredPeers*[A, B]( (pool.storage[pindex].peerType in filter): unsorted.add(pindex) - let sorted = pool.resort(unsorted) - for sindex in sorted: - yield pool.storage[sindex].data + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + for peer in sortedPeers: + yield peer proc `[]`*[A, B]( pool: PeerPool[A, B], From 7e66a76a886684b42c6b69a8cb3799d950d57e0c Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 26 Mar 2025 13:02:40 +0200 Subject: [PATCH 07/11] Add comments about iterators usage. --- beacon_chain/networking/peer_pool.nim | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 0ebc93cc91..4dd021d5e7 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -720,6 +720,10 @@ iterator peers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call PeerPool could becomed different + ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): if pool.storage[pindex].peerType in filter: @@ -740,6 +744,10 @@ iterator peers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call PeerPool could becomed different + ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): let item = addr(pool.storage[pindex]) @@ -761,6 +769,10 @@ iterator availablePeers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call PeerPool could becomed different + ## from the snapshot this iterator provides. # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. @@ -782,6 +794,10 @@ iterator availablePeers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call PeerPool could becomed different + ## from the snapshot this iterator provides. # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. @@ -803,6 +819,10 @@ iterator acquiredPeers*[A, B]( ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call PeerPool could becomed different + ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): if (PeerFlags.Acquired in pool.storage[pindex].flags) and From 030324f76ec545959879898c120e1567aeb0cea6 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 27 Mar 2025 00:27:16 +0200 Subject: [PATCH 08/11] Address review comments. --- beacon_chain/networking/peer_pool.nim | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 4dd021d5e7..6cac3cde4a 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -722,7 +722,7 @@ iterator peers*[A, B]( ## will be first. ## ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call PeerPool could becomed different + ## consider that right after `await` call, PeerPool could become different ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): @@ -746,7 +746,7 @@ iterator peers*[A, B]( ## will be first. ## ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call PeerPool could becomed different + ## consider that right after `await` call, PeerPool could become different ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): @@ -771,7 +771,7 @@ iterator availablePeers*[A, B]( ## will be first. ## ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call PeerPool could becomed different + ## consider that right after `await` call, PeerPool could become different ## from the snapshot this iterator provides. # We allocate new sequence here to avoid problems with missing indices when @@ -796,7 +796,7 @@ iterator availablePeers*[A, B]( ## will be first. ## ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call PeerPool could becomed different + ## consider that right after `await` call, PeerPool could become different ## from the snapshot this iterator provides. # We allocate new sequence here to avoid problems with missing indices when @@ -821,7 +821,7 @@ iterator acquiredPeers*[A, B]( ## will be first. ## ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call PeerPool could becomed different + ## consider that right after `await` call, PeerPool could become different ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): From 8e650a473222535cae79b9702dcf442bf6018531 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 31 Mar 2025 13:23:12 +0300 Subject: [PATCH 09/11] Fix number of PeerPool crashes. Switch back to distinct PeerIndex type to get ability to use custom cmp function. Switch back to linear search because binarySearch could not be working correctly. Make tests work with `ref Peer` instead of `Peer`, which helps reveal crashes. --- beacon_chain/networking/peer_pool.nim | 89 +++++++++++++-------------- tests/test_peer_pool.nim | 9 ++- 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 6cac3cde4a..3eea3a386e 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[tables, heapqueue, algorithm, sequtils] +import std/[tables, heapqueue, algorithm, sequtils, typetraits] import chronos export tables @@ -26,7 +26,9 @@ type LowScoreError, ## Peer has too low score. DeadPeerError ## Peer is already dead. - PeerIndex = int + PeerIndex = distinct int + # Distinct type is important here, because we are using custom sorting + # functions which are not compatible with integer behavior. PeerItem[T] = object data: T @@ -61,9 +63,11 @@ type PeerPoolError* = object of CatchableError +func `==`*(a, b: PeerIndex): bool {.borrow.} + iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) = for peerId, pindex in pool.registry: - yield (peerId, pool.storage[pindex].data) + yield (peerId, pool.storage[distinctBase(pindex)].data) proc resort[A, B]( pool: PeerPool[A, B], @@ -71,24 +75,14 @@ proc resort[A, B]( ): seq[PeerIndex] = mixin `cmp` proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = - cmp(pool.storage[a].data, pool.storage[b].data) + cmp(pool.storage[distinctBase(a)].data, pool.storage[distinctBase(b)].data) unsorted.sorted(pcmp, order = SortOrder.Descending) -proc bsearch[A, B]( - pool: PeerPool[A, B], - sorted: openArray[PeerIndex], - index: PeerIndex -): int = - mixin `cmp` - proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = - cmp(pool.storage[a].data, pool.storage[b].data) - sorted.binarySearch(index, pcmp) - proc addToStorage[A, B](pool: PeerPool[A, B], item: PeerItem[A]): PeerIndex = var indexedItem = item if len(pool.empties) > 0: indexedItem.index = pool.empties[0] - pool.storage[indexedItem.index] = indexedItem + pool.storage[distinctBase(indexedItem.index)] = indexedItem pool.empties.del(0) else: indexedItem.index = PeerIndex(len(pool.storage)) @@ -199,7 +193,7 @@ proc lenAvailable*[A, B]( let available = pool.lenAvailable(filter) var res = 0 for pindex in pool.sorted.items(): - let item = addr(pool.storage[pindex]) + let item = addr(pool.storage[distinctBase(pindex)]) if (PeerFlags.Acquired notin item[].flags) and (item[].peerType in filter) and (isNil(customFilter) or customFilter(item[].data)): @@ -280,14 +274,14 @@ proc deletePeerImpl[A, B]( key: B, pindex: PeerIndex ) = - let sindex = pool.bsearch(pool.sorted, pindex) - pool.storage[pindex] = PeerItem[A](index: PeerIndex(-1)) + let sindex = pool.sorted.find(pindex) + pool.storage[distinctBase(pindex)] = PeerItem[A](index: PeerIndex(-1)) pool.empties.add(pindex) pool.registry.del(key) if sindex >= 0: - # sindex == 0 when deleting peer which was acquired (not in `sorted` array). - pool.sorted = pool.resort(pool.sorted) + # sindex == -1 when deleting peer which was acquired (not in `sorted` array). pool.sorted.del(sindex) + pool.sorted = pool.resort(pool.sorted) # Indicate that we have an empty space pool.changeEvent.fire() @@ -310,7 +304,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = return false res - var item = addr(pool.storage[pindex]) + var item = addr(pool.storage[distinctBase(pindex)]) if (PeerFlags.Acquired in item[].flags): if not(force): item[].flags.incl(PeerFlags.DeleteOnRelease) @@ -342,7 +336,7 @@ proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, let item = PeerItem[A](data: peer, peerType: peerType) pindex = pool.addToStorage(item) - pitem = addr(pool.storage[pindex]) + pitem = addr(pool.storage[distinctBase(pindex)]) pool.registry[peerKey] = pindex pool.sorted.add(pindex) @@ -466,19 +460,18 @@ proc acquireItemImpl[A, B]( let (sindex, pitem) = block: var - rindex: PeerIndex = PeerIndex(-1) + rindex = -1 res: ptr PeerItem[A] = nil for sindex, pindex in pool.sorted.pairs(): - res = addr(pool.storage[pindex]) + res = addr(pool.storage[distinctBase(pindex)]) if (PeerFlags.Acquired notin res[].flags) and (res[].peerType in filter) and (isNil(customFilter) or customFilter(res[].data)): - rindex = PeerIndex(sindex) + rindex = sindex break (rindex, res) - doAssert(not(isNil(pitem))) - doAssert(PeerFlags.Acquired notin pitem[].flags) + doAssert(sindex >= 0) case pitem[].peerType of PeerType.Incoming: @@ -562,7 +555,7 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = if res == PeerIndex(-1): return res - item = addr(pool.storage[pindex]) + item = addr(pool.storage[distinctBase(pindex)]) if PeerFlags.Acquired in item[].flags: if not(pool.checkPeerScore(peer)): @@ -726,12 +719,13 @@ iterator peers*[A, B]( ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): - if pool.storage[pindex].peerType in filter: + if pool.storage[distinctBase(pindex)].peerType in filter: unsorted.add(pindex) # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. - let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + let sortedPeers = + pool.resort(unsorted).mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -750,14 +744,15 @@ iterator peers*[A, B]( ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): - let item = addr(pool.storage[pindex]) + let item = addr(pool.storage[distinctBase(pindex)]) if (item[].peerType in filter) and (isNil(customFilter) or customFilter(item[].data)): unsorted.add(pindex) # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. - let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + let sortedPeers = + pool.resort(unsorted).mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -778,9 +773,9 @@ iterator availablePeers*[A, B]( # await operation could be part of iteration. let sortedPeers = pool.sorted.filterIt( - (PeerFlags.Acquired notin pool.storage[it].flags) and - (pool.storage[it].peerType in filter)). - mapIt(pool.storage[it].data) + (PeerFlags.Acquired notin pool.storage[distinctBase(it)].flags) and + (pool.storage[distinctBase(it)].peerType in filter)). + mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -803,10 +798,11 @@ iterator availablePeers*[A, B]( # await operation could be part of iteration. let sortedPeers = pool.sorted.filterIt( - (PeerFlags.Acquired notin pool.storage[it].flags) and - (pool.storage[it].peerType in filter) and - (isNil(customFilter) or customFilter(pool.storage[it].data))). - mapIt(pool.storage[it].data) + (PeerFlags.Acquired notin pool.storage[distinctBase(it)].flags) and + (pool.storage[distinctBase(it)].peerType in filter) and + (isNil(customFilter) or + customFilter(pool.storage[distinctBase(it)].data))). + mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -825,13 +821,14 @@ iterator acquiredPeers*[A, B]( ## from the snapshot this iterator provides. var unsorted: seq[PeerIndex] for pindex in pool.registry.values(): - if (PeerFlags.Acquired in pool.storage[pindex].flags) and - (pool.storage[pindex].peerType in filter): + if (PeerFlags.Acquired in pool.storage[distinctBase(pindex)].flags) and + (pool.storage[distinctBase(pindex)].peerType in filter): unsorted.add(pindex) # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. - let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + let sortedPeers = + pool.resort(unsorted).mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -840,14 +837,14 @@ proc `[]`*[A, B]( key: B ): A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - pool.storage[pool.registry[key]].data + pool.storage[distinctBase(pool.registry[key])].data proc `[]`*[A, B]( pool: var PeerPool[A, B], key: B ): var A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - pool.storage[pool.registry[key]].data + pool.storage[distinctBase(pool.registry[key])].data proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} = ## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``. @@ -859,7 +856,7 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = ## (e.g. 0 for any integer type). let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) if pindex != PeerIndex(-1): - pool.storage[pindex].data + pool.storage[distinctBase(pindex)].data else: A() @@ -869,7 +866,7 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B, ## not present, default value ``default`` is returned. let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) if pindex != PeerIndex(-1): - pool.storage[pindex].data + pool.storage[distinctBase(pindex)].data else: default diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 16579b657e..3ef265d225 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -15,7 +15,7 @@ import ./testutil type PeerTestID = string - PeerTest = object + PeerTest = ref object id: PeerTestID weight: int metadata: uint64 @@ -30,8 +30,11 @@ func getFuture(peer: PeerTest): Future[void] = func getMetadata(peer: PeerTest): uint64 = peer.metadata -func `<`(a, b: PeerTest): bool = - `<`(a.weight, b.weight) +func cmp*(a, b: PeerTest): int = + if a.weight == b.weight: + cmp(a.throughput, b.throughput) + else: + cmp(a.weight, b.weight) proc init*(t: typedesc[PeerTest], id: string = "", weight: int = 0): PeerTest = From 6a631549db1013f3c5ecdcd4cd77733017fee3f2 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 31 Mar 2025 13:59:17 +0300 Subject: [PATCH 10/11] Eliminate debugging symbols. --- tests/test_peer_pool.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 3ef265d225..a623d8f954 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -31,10 +31,7 @@ func getMetadata(peer: PeerTest): uint64 = peer.metadata func cmp*(a, b: PeerTest): int = - if a.weight == b.weight: - cmp(a.throughput, b.throughput) - else: - cmp(a.weight, b.weight) + cmp(a.weight, b.weight) proc init*(t: typedesc[PeerTest], id: string = "", weight: int = 0): PeerTest = From c914cb132557ccb1a0c147498fa6875739f3afbf Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 31 Mar 2025 18:17:29 +0300 Subject: [PATCH 11/11] Address review comments. --- beacon_chain/networking/peer_pool.nim | 30 ++++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 3eea3a386e..a03342bf59 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -71,12 +71,20 @@ iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) = proc resort[A, B]( pool: PeerPool[A, B], - unsorted: openArray[PeerIndex] -): seq[PeerIndex] = + unsorted: var openArray[PeerIndex] +) = mixin `cmp` proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = cmp(pool.storage[distinctBase(a)].data, pool.storage[distinctBase(b)].data) - unsorted.sorted(pcmp, order = SortOrder.Descending) + unsorted.sort(pcmp, order = SortOrder.Descending) + +proc resorted[A, B]( + pool: PeerPool[A, B], + unsorted: openArray[PeerIndex] +): seq[PeerIndex] = + var res = @unsorted + pool.resort(res) + res proc addToStorage[A, B](pool: PeerPool[A, B], item: PeerItem[A]): PeerIndex = var indexedItem = item @@ -280,8 +288,7 @@ proc deletePeerImpl[A, B]( pool.registry.del(key) if sindex >= 0: # sindex == -1 when deleting peer which was acquired (not in `sorted` array). - pool.sorted.del(sindex) - pool.sorted = pool.resort(pool.sorted) + pool.sorted.delete(sindex) # Indicate that we have an empty space pool.changeEvent.fire() @@ -340,7 +347,7 @@ proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, pool.registry[peerKey] = pindex pool.sorted.add(pindex) - pool.sorted = pool.resort(pool.sorted) + pool.resort(pool.sorted) pitem[].data.getFuture().addCallback(onPeerClosed) case peerType @@ -479,8 +486,7 @@ proc acquireItemImpl[A, B]( of PeerType.Outgoing: inc(pool.acqOutPeersCount) - pool.sorted.del(sindex) - pool.sorted = pool.resort(pool.sorted) + pool.sorted.delete(sindex) pitem[].flags.incl(PeerFlags.Acquired) pitem[].data @@ -578,7 +584,7 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = dec(pool.acqOutPeersCount) pool.sorted.add(pindex) - pool.sorted = pool.resort(pool.sorted) + pool.resort(pool.sorted) pool.changeEvent.fire() proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) = @@ -725,7 +731,7 @@ iterator peers*[A, B]( # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. let sortedPeers = - pool.resort(unsorted).mapIt(pool.storage[distinctBase(it)].data) + pool.resorted(unsorted).mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -752,7 +758,7 @@ iterator peers*[A, B]( # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. let sortedPeers = - pool.resort(unsorted).mapIt(pool.storage[distinctBase(it)].data) + pool.resorted(unsorted).mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer @@ -828,7 +834,7 @@ iterator acquiredPeers*[A, B]( # We allocate new sequence here to avoid problems with missing indices when # await operation could be part of iteration. let sortedPeers = - pool.resort(unsorted).mapIt(pool.storage[distinctBase(it)].data) + pool.resorted(unsorted).mapIt(pool.storage[distinctBase(it)].data) for peer in sortedPeers: yield peer