diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index fc4487f37a..f6e4aec23e 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -791,6 +791,7 @@ AllTests-mainnet + Access peers by key test OK + Acquire from empty pool OK + Acquire/Sorting and consistency test OK ++ Custom filters test OK + Delete peer on release text OK + Iterators test OK + Peer lifetime test OK diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index b9d3d37970..66b03dd651 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -480,15 +480,12 @@ func netKbps*(peer: Peer): float {.inline.} = ## Returns current network throughput average value in Kbps for peer ``peer``. round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) -# /!\ Must be exported to be seen by `peerCmp` -func `<`*(a, b: Peer): bool = - ## Comparison function indicating `true` if peer `a` ranks worse than peer `b` - if a.score != b.score: - a.score < b.score - elif a.netThroughput.average != b.netThroughput.average: - a.netThroughput.average < b.netThroughput.average +# /!\ Must be exported to be seen by `peerpool`. +func cmp*(a, b: Peer): int = + if a.score == b.score: + cmp(a.netThroughput.average, b.netThroughput.average) else: - system.`<`(a, b) + cmp(a.score, b.score) const maxRequestQuota = 1000000 diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 611c86e304..6cac3cde4a 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[tables, heapqueue] +import std/[tables, heapqueue, algorithm, sequtils] import chronos export tables @@ -19,9 +19,6 @@ type PeerFlags = enum Acquired, DeleteOnRelease - EventType = enum - NotEmptyEvent, NotFullEvent - PeerStatus* = enum Success, ## Peer was successfully added to PeerPool. DuplicateError, ## Peer is already present in PeerPool. @@ -29,15 +26,13 @@ type LowScoreError, ## Peer has too low score. DeadPeerError ## Peer is already dead. + PeerIndex = int + PeerItem[T] = object data: T peerType: PeerType flags: set[PeerFlags] - index: int - - PeerIndex = object - data: int - cmp: proc(a, b: PeerIndex): bool {.gcsafe, raises: [].} + index: PeerIndex PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [].} @@ -45,16 +40,14 @@ type PeerOnDeleteCallback*[T] = proc(peer: T) {.gcsafe, raises: [].} + PeerCustomFilterCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [].} + PeerPool*[A, B] = ref object - incNotEmptyEvent*: AsyncEvent - outNotEmptyEvent*: AsyncEvent - incNotFullEvent*: AsyncEvent - outNotFullEvent*: AsyncEvent - incQueue: HeapQueue[PeerIndex] - outQueue: HeapQueue[PeerIndex] - registry: Table[B, PeerIndex] + changeEvent: AsyncEvent storage: seq[PeerItem[A]] - cmp: proc(a, b: PeerIndex): bool {.gcsafe, raises: [].} + registry: Table[B, PeerIndex] + sorted: seq[PeerIndex] + empties: seq[PeerIndex] scoreCheck: PeerScoreCheckCallback[A] onDeletePeer: PeerOnDeleteCallback[A] peerCounter: PeerCounterCallback @@ -68,91 +61,48 @@ type PeerPoolError* = object of CatchableError -proc `<`*(a, b: PeerIndex): bool = - ## PeerIndex ``a`` holds reference to ``cmp()`` procedure which has captured - ## PeerPool instance. - a.cmp(b, a) - -proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B], - item: PeerItem[A]) = - case item.peerType: - of PeerType.Incoming: - pool.incNotEmptyEvent.fire() - of PeerType.Outgoing: - pool.outNotEmptyEvent.fire() - -proc fireNotFullEvent[A, B](pool: PeerPool[A, B], - item: PeerItem[A]) = - case item.peerType: - of PeerType.Incoming: - pool.incNotFullEvent.fire() - of PeerType.Outgoing: - pool.outNotFullEvent.fire() - iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) = - for peerId, peerIdx in pool.registry: - yield (peerId, pool.storage[peerIdx.data].data) - -template incomingEvent(eventType: EventType): AsyncEvent = - case eventType - of EventType.NotEmptyEvent: - pool.incNotEmptyEvent - of EventType.NotFullEvent: - pool.incNotFullEvent - -template outgoingEvent(eventType: EventType): AsyncEvent = - case eventType - of EventType.NotEmptyEvent: - pool.outNotEmptyEvent - of EventType.NotFullEvent: - pool.outNotFullEvent - -proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, - filter: set[PeerType]) {.async: (raises: [CancelledError]).} = - if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: - var fut1 = incomingEvent(eventType).wait() - var fut2 = outgoingEvent(eventType).wait() - try: - try: - discard await one(fut1, fut2) - except ValueError: - raiseAssert "one precondition satisfied" - if fut1.finished(): - if not(fut2.finished()): - await fut2.cancelAndWait() - incomingEvent(eventType).clear() - else: - if not(fut1.finished()): - await fut1.cancelAndWait() - outgoingEvent(eventType).clear() - except CancelledError as exc: - var pending: seq[FutureBase] - if not(fut1.finished()): - pending.add(fut1.cancelAndWait()) - if not(fut2.finished()): - pending.add(fut2.cancelAndWait()) - await noCancel allFutures(pending) - raise exc - elif PeerType.Incoming in filter: - await incomingEvent(eventType).wait() - incomingEvent(eventType).clear() - elif PeerType.Outgoing in filter: - await outgoingEvent(eventType).wait() - outgoingEvent(eventType).clear() - -proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = - pool.waitForEvent(EventType.NotEmptyEvent, filter) - -proc waitNotFullEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = - pool.waitForEvent(EventType.NotFullEvent, filter) - -proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, - maxOutgoingPeers = -1, - scoreCheckCb: PeerScoreCheckCallback[A] = nil, - peerCounterCb: PeerCounterCallback = nil, - onDeleteCb: PeerOnDeleteCallback[A] = nil): PeerPool[A, B] = + for peerId, pindex in pool.registry: + yield (peerId, pool.storage[pindex].data) + +proc resort[A, B]( + pool: PeerPool[A, B], + unsorted: openArray[PeerIndex] +): seq[PeerIndex] = + mixin `cmp` + proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = + cmp(pool.storage[a].data, pool.storage[b].data) + unsorted.sorted(pcmp, order = SortOrder.Descending) + +proc bsearch[A, B]( + pool: PeerPool[A, B], + sorted: openArray[PeerIndex], + index: PeerIndex +): int = + mixin `cmp` + proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = + cmp(pool.storage[a].data, pool.storage[b].data) + sorted.binarySearch(index, pcmp) + +proc addToStorage[A, B](pool: PeerPool[A, B], item: PeerItem[A]): PeerIndex = + var indexedItem = item + if len(pool.empties) > 0: + indexedItem.index = pool.empties[0] + pool.storage[indexedItem.index] = indexedItem + pool.empties.del(0) + else: + indexedItem.index = PeerIndex(len(pool.storage)) + pool.storage.add(indexedItem) + indexedItem.index + +proc newPeerPool*[A, B]( + maxPeers = -1, + maxIncomingPeers = -1, + maxOutgoingPeers = -1, + scoreCheckCb: PeerScoreCheckCallback[A] = nil, + peerCounterCb: PeerCounterCallback = nil, + onDeleteCb: PeerOnDeleteCallback[A] = nil +): PeerPool[A, B] = ## Create new PeerPool. ## ## ``maxPeers`` - maximum number of peers allowed. All the peers which @@ -178,40 +128,35 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, ## ## Please note, that if ``maxPeers`` is positive non-zero value, then equation ## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``. - var res = PeerPool[A, B]() if maxPeers != -1: doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers) - res.maxPeersCount = if maxPeers < 0: high(int) else: maxPeers - res.maxIncPeersCount = - if maxIncomingPeers < 0: - high(int) - else: - maxIncomingPeers - res.maxOutPeersCount = - if maxOutgoingPeers < 0: - high(int) - else: - maxOutgoingPeers - - res.incNotEmptyEvent = newAsyncEvent() - res.outNotEmptyEvent = newAsyncEvent() - res.incNotFullEvent = newAsyncEvent() - res.outNotFullEvent = newAsyncEvent() - res.incQueue = initHeapQueue[PeerIndex]() - res.outQueue = initHeapQueue[PeerIndex]() - res.registry = initTable[B, PeerIndex]() - res.scoreCheck = scoreCheckCb - res.peerCounter = peerCounterCb - res.onDeletePeer = onDeleteCb - res.storage = newSeq[PeerItem[A]]() - - proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} = - let p1 = res.storage[a.data].data - let p2 = res.storage[b.data].data - p1 < p2 - - res.cmp = peerCmp + let + maxPeersCount = if maxPeers < 0: high(int) else: maxPeers + maxIncPeersCount = + if maxIncomingPeers < 0: + high(int) + else: + maxIncomingPeers + maxOutPeersCount = + if maxOutgoingPeers < 0: + high(int) + else: + maxOutgoingPeers + res = PeerPool[A, B]( + changeEvent: newAsyncEvent(), + registry: initTable[B, PeerIndex](), + scoreCheck: scoreCheckCb, + peerCounter: peerCounterCb, + onDeletePeer: onDeleteCb, + maxPeersCount: maxPeersCount, + maxIncPeersCount: maxIncPeersCount, + maxOutPeersCount: maxOutPeersCount, + curIncPeersCount: 0, + curOutPeersCount: 0, + acqIncPeersCount: 0, + acqOutPeersCount: 0 + ) res proc len*[A, B](pool: PeerPool[A, B]): int = @@ -221,50 +166,86 @@ proc len*[A, B](pool: PeerPool[A, B]): int = proc lenCurrent*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = + PeerType.Outgoing}): int = ## Returns number of registered peers in PeerPool ``pool`` which satisfies ## filter ``filter``. (if PeerType.Incoming in filter: pool.curIncPeersCount else: 0) + (if PeerType.Outgoing in filter: pool.curOutPeersCount else: 0) -proc lenAvailable*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = - ## Returns number of available peers in PeerPool ``pool`` which satisfies - ## filter ``filter``. - (if PeerType.Incoming in filter: len(pool.incQueue) else: 0) + - (if PeerType.Outgoing in filter: len(pool.outQueue) else: 0) +proc lenAvailable*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): int = + ## Returns number of peers available for acquisition in PeerPool + ## ``pool`` which satisfies filter ``filter``. + (if PeerType.Incoming in filter: + pool.curIncPeersCount - pool.acqIncPeersCount + else: + 0) + + (if PeerType.Outgoing in filter: + pool.curOutPeersCount - pool.acqOutPeersCount + else: + 0) + +proc lenAvailable*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): int = + ## Returns number of peers available for acquisition in PeerPool + ## ``pool`` which satisfies filter ``filter`` and custom filter + ## ``customFilter``. + ## Note: This is O(n) operation. + let available = pool.lenAvailable(filter) + var res = 0 + for pindex in pool.sorted.items(): + let item = addr(pool.storage[pindex]) + if (PeerFlags.Acquired notin item[].flags) and + (item[].peerType in filter) and + (isNil(customFilter) or customFilter(item[].data)): + inc(res) + if res == available: + # Number of customly filtered items could not be higher than number of + # peers of specific directions. + break + res -proc lenAcquired*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = +proc lenAcquired*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): int = ## Returns number of acquired peers in PeerPool ``pool`` which satisifies ## filter ``filter``. (if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) + (if PeerType.Outgoing in filter: pool.acqOutPeersCount else: 0) -proc lenSpace*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): int {.inline.} = +proc lenSpace*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): int = ## Returns number of available space for peers in PeerPool ``pool`` which ## satisfies filter ``filter``. - let curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount - let totalSpace = pool.maxPeersCount - curPeersCount - let incoming = min(totalSpace, pool.maxIncPeersCount - pool.curIncPeersCount) - let outgoing = min(totalSpace, pool.maxOutPeersCount - pool.curOutPeersCount) + let + curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount + spaceAvailable = pool.maxPeersCount - curPeersCount + incoming = min(spaceAvailable, + pool.maxIncPeersCount - pool.curIncPeersCount) + outgoing = min(spaceAvailable, + pool.maxOutPeersCount - pool.curOutPeersCount) if filter == {PeerType.Incoming, PeerType.Outgoing}: # To avoid overflow check we need to check by ourself. if uint64(incoming) + uint64(outgoing) > uint64(high(int)): - min(totalSpace, high(int)) + min(spaceAvailable, high(int)) else: - min(totalSpace, incoming + outgoing) + min(spaceAvailable, incoming + outgoing) elif PeerType.Incoming in filter: incoming else: outgoing proc shortLogAvailable*[A, B](pool: PeerPool[A, B]): string = - $len(pool.incQueue) & "/" & $len(pool.outQueue) + $pool.lenAvailable({PeerType.Incoming}) & "/" & + $pool.lenAvailable({PeerType.Outgoing}) proc shortLogAcquired*[A, B](pool: PeerPool[A, B]): string = $pool.acqIncPeersCount & "/" & $pool.acqOutPeersCount @@ -293,6 +274,26 @@ proc peerDeleted[A, B](pool: PeerPool[A, B], peer: A) = if not(isNil(pool.onDeletePeer)): pool.onDeletePeer(peer) +proc deletePeerImpl[A, B]( + pool: PeerPool[A, B], + peer: A, + key: B, + pindex: PeerIndex +) = + let sindex = pool.bsearch(pool.sorted, pindex) + pool.storage[pindex] = PeerItem[A](index: PeerIndex(-1)) + pool.empties.add(pindex) + pool.registry.del(key) + if sindex >= 0: + # sindex == 0 when deleting peer which was acquired (not in `sorted` array). + pool.sorted = pool.resort(pool.sorted) + pool.sorted.del(sindex) + + # Indicate that we have an empty space + pool.changeEvent.fire() + pool.peerDeleted(peer) + pool.peerCountChanged() + proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## Remove ``peer`` from PeerPool ``pool``. ## @@ -300,78 +301,60 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## be deleted only when peer will be released. You can change this behavior ## with ``force`` option. mixin getKey - let key = getKey(peer) - if pool.registry.hasKey(key): - let pindex = try: pool.registry[key].data - except KeyError: raiseAssert "checked with hasKey" - var item = addr(pool.storage[pindex]) - if (PeerFlags.Acquired in item[].flags): - if not(force): - item[].flags.incl(PeerFlags.DeleteOnRelease) - else: - if item[].peerType == PeerType.Incoming: - dec(pool.curIncPeersCount) - dec(pool.acqIncPeersCount) - elif item[].peerType == PeerType.Outgoing: - dec(pool.curOutPeersCount) - dec(pool.acqOutPeersCount) - - # Indicate that we have an empty space - pool.fireNotFullEvent(item[]) - # Cleanup storage with default item, and removing key from hashtable. - pool.storage[pindex] = PeerItem[A]() - pool.registry.del(key) - pool.peerDeleted(peer) - pool.peerCountChanged() + let + key = peer.getKey() + pindex = + block: + let res = pool.registry.getOrDefault(key, PeerIndex(-1)) + if res == PeerIndex(-1): + return false + res + + var item = addr(pool.storage[pindex]) + if (PeerFlags.Acquired in item[].flags): + if not(force): + item[].flags.incl(PeerFlags.DeleteOnRelease) else: - if item[].peerType == PeerType.Incoming: - # If peer is available, then its copy present in heapqueue, so we need - # to remove it. - for i in 0 ..< len(pool.incQueue): - if pool.incQueue[i].data == pindex: - pool.incQueue.del(i) - break + case item[].peerType + of PeerType.Incoming: + dec(pool.acqIncPeersCount) dec(pool.curIncPeersCount) - elif item[].peerType == PeerType.Outgoing: - # If peer is available, then its copy present in heapqueue, so we need - # to remove it. - for i in 0 ..< len(pool.outQueue): - if pool.outQueue[i].data == pindex: - pool.outQueue.del(i) - break + of PeerType.Outgoing: + dec(pool.acqOutPeersCount) dec(pool.curOutPeersCount) - - # Indicate that we have an empty space - pool.fireNotFullEvent(item[]) - # Cleanup storage with default item, and removing key from hashtable. - pool.storage[pindex] = PeerItem[A]() - pool.registry.del(key) - pool.peerDeleted(peer) - pool.peerCountChanged() - true + pool.deletePeerImpl(peer, key, pindex) else: - false + case item[].peerType + of PeerType.Incoming: + dec(pool.curIncPeersCount) + of PeerType.Outgoing: + dec(pool.curOutPeersCount) + pool.deletePeerImpl(peer, key, pindex) + + true proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, peerType: PeerType) = + mixin getFuture proc onPeerClosed(udata: pointer) {.gcsafe, raises: [].} = discard pool.deletePeer(peer) - let item = PeerItem[A](data: peer, peerType: peerType, - index: len(pool.storage)) - pool.storage.add(item) - var pitem = addr(pool.storage[^1]) - let pindex = PeerIndex(data: item.index, cmp: pool.cmp) + let + item = PeerItem[A](data: peer, peerType: peerType) + pindex = pool.addToStorage(item) + pitem = addr(pool.storage[pindex]) + pool.registry[peerKey] = pindex + pool.sorted.add(pindex) + pool.sorted = pool.resort(pool.sorted) + pitem[].data.getFuture().addCallback(onPeerClosed) - if peerType == PeerType.Incoming: + case peerType + of PeerType.Incoming: inc(pool.curIncPeersCount) - pool.incQueue.push(pindex) - pool.incNotEmptyEvent.fire() - elif peerType == PeerType.Outgoing: + of PeerType.Outgoing: inc(pool.curOutPeersCount) - pool.outQueue.push(pindex) - pool.outNotEmptyEvent.fire() + pool.changeEvent.fire() pool.peerCountChanged() proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = @@ -395,8 +378,11 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = else: PeerStatus.DuplicateError -proc addPeerNoWait*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): PeerStatus = +proc addPeerNoWait*[A, B]( + pool: PeerPool[A, B], + peer: A, + peerType: PeerType +): PeerStatus = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## Procedure returns ``PeerStatus`` @@ -428,41 +414,21 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B], else: PeerStatus.NoSpaceError -proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], - peerType: PeerType): set[PeerType] {.inline.} = - ## This procedure returns set of events which you need to wait to get empty - ## space for peer type ``peerType``. This set can be used for call to - ## ``waitNotFullEvent()``. - case peerType: - of PeerType.Incoming: - if pool.maxIncPeersCount >= pool.maxPeersCount: - # If maximum number of `incoming` peers is only limited by - # maximum number of peers, then we could wait for both events. - # It means that we do not care about what peer will left pool. - {PeerType.Incoming, PeerType.Outgoing} - else: - # Otherwise we could wait only for `incoming` event - {PeerType.Incoming} - of PeerType.Outgoing: - if pool.maxOutPeersCount >= pool.maxPeersCount: - # If maximum number of `outgoing` peers is only limited by - # maximum number of peers, then we could wait for both events. - # It means that we do not care about what peer will left pool. - {PeerType.Incoming, PeerType.Outgoing} - else: - # Otherwise we could wait only for `outgoing` event - {PeerType.Outgoing} - -proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], - peerType: PeerType) {.async: (raises: [CancelledError]).} = +proc waitForEmptySpace*[A, B]( + pool: PeerPool[A, B], + peerType: PeerType +) {.async: (raises: [CancelledError]).} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. - let mask = pool.getPeerSpaceMask(peerType) while pool.lenSpace({peerType}) == 0: - await pool.waitNotFullEvent(mask) + await pool.changeEvent.wait() + pool.changeEvent.clear() -proc addPeer*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = +proc addPeer*[A, B]( + pool: PeerPool[A, B], + peer: A, + peerType: PeerType +): Future[PeerStatus] {.async: (raises: [CancelledError]).} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -474,134 +440,176 @@ proc addPeer*[A, B](pool: PeerPool[A, B], ## ## Procedure returns (PeerStatus.Success) on success. mixin getKey - let res = + + template check(peer: untyped) = + let res = pool.checkPeer(peer) + if res != PeerStatus.Success: + return res + + while pool.lenSpace({peerType}) == 0: + peer.check() + await pool.changeEvent.wait() + pool.changeEvent.clear() + + # Because we could wait for a long time we need to check peer one more + # time to avoid race condition. + peer.check() + + pool.addPeerImpl(peer, peer.getKey(), peerType) + PeerStatus.Success + +proc acquireItemImpl[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] = nil +): A = + let (sindex, pitem) = block: - let res1 = pool.checkPeer(peer) - if res1 != PeerStatus.Success: - res1 - else: - let mask = pool.getPeerSpaceMask(peerType) - # We going to block here until ``pool`` will not have free space, - # for our type of peer. - while pool.lenSpace({peerType}) == 0: - await pool.waitNotFullEvent(mask) - # Because we could wait for a long time we need to check peer one more - # time to avoid race condition. - let res2 = pool.checkPeer(peer) - if res2 == PeerStatus.Success: - let peerKey = peer.getKey() - pool.addPeerImpl(peer, peerKey, peerType) - PeerStatus.Success - else: - res2 - return res - -proc acquireItemImpl[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): A {.inline.} = - doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0)) - let pindex = - if filter == {PeerType.Incoming, PeerType.Outgoing}: - if len(pool.outQueue) > 0 and len(pool.incQueue) > 0: - # `<` here is the `PeerIndex` implementation (`HeapQueue` uses `<`), - # which then flips the arguments to rank `>` on `A` using `pool.cmp` - if pool.incQueue[0] < pool.outQueue[0]: - inc(pool.acqIncPeersCount) - let item = pool.incQueue.pop() - item.data - else: - inc(pool.acqOutPeersCount) - let item = pool.outQueue.pop() - item.data - else: - if len(pool.outQueue) > 0: - inc(pool.acqOutPeersCount) - let item = pool.outQueue.pop() - item.data - else: - inc(pool.acqIncPeersCount) - let item = pool.incQueue.pop() - item.data - else: - if PeerType.Outgoing in filter: - inc(pool.acqOutPeersCount) - let item = pool.outQueue.pop() - item.data - else: - inc(pool.acqIncPeersCount) - let item = pool.incQueue.pop() - item.data - var pitem = addr(pool.storage[pindex]) + var + rindex: PeerIndex = PeerIndex(-1) + res: ptr PeerItem[A] = nil + for sindex, pindex in pool.sorted.pairs(): + res = addr(pool.storage[pindex]) + if (PeerFlags.Acquired notin res[].flags) and + (res[].peerType in filter) and + (isNil(customFilter) or customFilter(res[].data)): + rindex = PeerIndex(sindex) + break + (rindex, res) + + doAssert(not(isNil(pitem))) doAssert(PeerFlags.Acquired notin pitem[].flags) + + case pitem[].peerType + of PeerType.Incoming: + inc(pool.acqIncPeersCount) + of PeerType.Outgoing: + inc(pool.acqOutPeersCount) + + pool.sorted.del(sindex) + pool.sorted = pool.resort(pool.sorted) + pitem[].flags.incl(PeerFlags.Acquired) pitem[].data -proc acquire*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = +proc acquire*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): Future[A] {.async: (raises: [CancelledError]).} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. + ## This procedure will wait for peer which satisfy filter will become + ## available for acquisition. mixin getKey doAssert(filter != {}, "Filter must not be empty") while true: if pool.lenAvailable(filter) == 0: - await pool.waitNotEmptyEvent(filter) + await pool.changeEvent.wait() + pool.changeEvent.clear() else: - return pool.acquireItemImpl(filter) - -proc acquireNoWait*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing} - ): A {.raises: [PeerPoolError].} = + return pool.acquireItemImpl(filter, nil) + +proc acquire*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): Future[A] {.async: (raises: [CancelledError]).} = + ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` and + ## custom filter ``customFilter``. This procedure will wait for peer which + ## satisfy filters will become available for acquisition. + mixin getKey + doAssert(filter != {}, "Filter must not be empty") + while true: + if pool.lenAvailable(filter, customFilter) == 0: + await pool.changeEvent.wait() + pool.changeEvent.clear() + else: + return pool.acquireItemImpl(filter, customFilter) + +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A {.raises: [PeerPoolError].} = + ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` + ## without waiting, this procedure will raise PeerPoolError if no peers + ## which satisfy filters are available for acquisition. doAssert(filter != {}, "Filter must not be empty") if pool.lenAvailable(filter) < 1: raise newException(PeerPoolError, "Not enough peers in pool") - pool.acquireItemImpl(filter) + pool.acquireItemImpl(filter, nil) + +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): A {.raises: [PeerPoolError].} = + ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` and + ## custom filter ``customFilter`` without waiting, this procedure will raise + ## PeerPoolError if no peers which satisfy filters are available for + ## acquisition. + doAssert(filter != {}, "Filter must not be empty") + if pool.lenAvailable(filter, customFilter) < 1: + raise newException(PeerPoolError, "Not enough peers in pool") + pool.acquireItemImpl(filter, customFilter) proc release*[A, B](pool: PeerPool[A, B], peer: A) = ## Release peer ``peer`` back to PeerPool ``pool`` mixin getKey - let key = getKey(peer) - var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1)) - if titem.data >= 0: - let pindex = titem.data - var item = addr(pool.storage[pindex]) - if PeerFlags.Acquired in item[].flags: - if not(pool.checkPeerScore(peer)): - item[].flags.incl(DeleteOnRelease) - if PeerFlags.DeleteOnRelease in item[].flags: - # We do not care about result here because peer is present in registry - # and has all proper flags set. - discard pool.deletePeer(peer, force = true) - else: - item[].flags.excl(PeerFlags.Acquired) - case item[].peerType - of PeerType.Incoming: - pool.incQueue.push(titem) - dec(pool.acqIncPeersCount) - of PeerType.Outgoing: - pool.outQueue.push(titem) - dec(pool.acqOutPeersCount) - pool.fireNotEmptyEvent(item[]) - -proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = + let + key = peer.getKey() + pindex = + block: + let res = pool.registry.getOrDefault(key, PeerIndex(-1)) + if res == PeerIndex(-1): + return + res + item = addr(pool.storage[pindex]) + + if PeerFlags.Acquired in item[].flags: + if not(pool.checkPeerScore(peer)): + item[].flags.incl(DeleteOnRelease) + if PeerFlags.DeleteOnRelease in item[].flags: + case item[].peerType + of PeerType.Incoming: + dec(pool.acqIncPeersCount) + dec(pool.curIncPeersCount) + of PeerType.Outgoing: + dec(pool.acqOutPeersCount) + dec(pool.curOutPeersCount) + pool.deletePeerImpl(peer, key, pindex) + else: + item[].flags.excl(PeerFlags.Acquired) + case item[].peerType + of PeerType.Incoming: + dec(pool.acqIncPeersCount) + of PeerType.Outgoing: + dec(pool.acqOutPeersCount) + + pool.sorted.add(pindex) + pool.sorted = pool.resort(pool.sorted) + pool.changeEvent.fire() + +proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) = ## Release array of peers ``peers`` back to PeerPool ``pool``. for item in peers: pool.release(item) -proc acquire*[A, B](pool: PeerPool[A, B], - number: int, - filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = +proc acquire*[A, B]( + pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, PeerType.Outgoing} +): Future[seq[A]] {.async: (raises: [CancelledError]).} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") - var peers = newSeq[A]() + var peers: seq[A] try: if number > 0: while true: if len(peers) >= number: break if pool.lenAvailable(filter) == 0: - await pool.waitNotEmptyEvent(filter) + await pool.changeEvent.wait() + pool.changeEvent.clear() else: peers.add(pool.acquireItemImpl(filter)) except CancelledError as exc: @@ -611,96 +619,235 @@ proc acquire*[A, B](pool: PeerPool[A, B], pool.release(item) peers.setLen(0) raise exc - return peers + peers + +proc acquire*[A, B]( + pool: PeerPool[A, B], + number: int, + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): Future[seq[A]] {.async: (raises: [CancelledError]).} = + ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the + ## filter ``filter`` and custom filter ``customFilter``. This procedure will + ## wait for ``number`` of peers which satisfy filter will become available + ## and acquired. + doAssert(filter != {}, "Filter must not be empty") + var peers: seq[A] + try: + if number > 0: + while true: + if len(peers) >= number: + break + if pool.lenAvailable(filter, customFilter) == 0: + await pool.changeEvent.wait() + pool.changeEvent.clear() + else: + peers.add(pool.acquireItemImpl(filter, customFilter)) + except CancelledError as exc: + # If we got cancelled, we need to return all the acquired peers back to + # pool. + for item in peers: + pool.release(item) + peers.setLen(0) + raise exc + peers -proc acquireNoWait*[A, B](pool: PeerPool[A, B], - number: int, - filter = {PeerType.Incoming, - PeerType.Outgoing}): seq[A] = +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, PeerType.Outgoing} +): seq[A] = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the - ## filter ``filter``. + ## filter ``filter``. This procedure does not wait for peers, it will raise + ## `PeerPoolError` if peers matching the filters are not available. doAssert(filter != {}, "Filter must not be empty") - var peers = newSeq[A]() + var peers: seq[A] if pool.lenAvailable(filter) < number: raise newException(PeerPoolError, "Not enough peers in pool") for i in 0 ..< number: peers.add(pool.acquireItemImpl(filter)) - return peers + peers + +proc acquireNoWait*[A, B]( + pool: PeerPool[A, B], + number: int, + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): seq[A] = + ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the + ## filter ``filter`` and custom filter ``filter``. This procedure does not + ## wait for peers, it will raise `PeerPoolError` if peers matching the + ## filters are not available. + doAssert(filter != {}, "Filter must not be empty") + var peers: seq[A] + if pool.lenAvailable(filter, customFilter) < number: + raise newException(PeerPoolError, "Not enough peers in pool") + for i in 0 ..< number: + peers.add(pool.acquireItemImpl(filter, customFilter)) + peers -proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = +proc acquireIncomingPeer*[A, B]( + pool: PeerPool[A, B] +): Future[A] {.async: (raises: [CancelledError], raw: true).} = ## Acquire single incoming peer from PeerPool ``pool``. pool.acquire({PeerType.Incoming}) -proc acquireOutgoingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = +proc acquireOutgoingPeer*[A, B]( + pool: PeerPool[A, B] +): Future[A] {.async: (raises: [CancelledError], raw: true).} = ## Acquire single outgoing peer from PeerPool ``pool``. pool.acquire({PeerType.Outgoing}) -proc acquireIncomingPeers*[A, B](pool: PeerPool[A, B], - number: int): Future[seq[A]] {.inline.} = +proc acquireIncomingPeers*[A, B]( + pool: PeerPool[A, B], + number: int +): Future[seq[A]] {.async: (raises: [CancelledError], raw: true).} = ## Acquire ``number`` number of incoming peers from PeerPool ``pool``. pool.acquire(number, {PeerType.Incoming}) -proc acquireOutgoingPeers*[A, B](pool: PeerPool[A, B], - number: int): Future[seq[A]] {.inline.} = +proc acquireOutgoingPeers*[A, B]( + pool: PeerPool[A, B], + number: int +): Future[seq[A]] {.async: (raises: [CancelledError], raw: true).} = ## Acquire ``number`` number of outgoing peers from PeerPool ``pool``. pool.acquire(number, {PeerType.Outgoing}) -iterator peers*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): A = +iterator peers*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A = + ## Iterate over sorted list of peers. + ## + ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values + ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call, PeerPool could become different + ## from the snapshot this iterator provides. + var unsorted: seq[PeerIndex] + for pindex in pool.registry.values(): + if pool.storage[pindex].peerType in filter: + unsorted.add(pindex) + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + for peer in sortedPeers: + yield peer + +iterator peers*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): A = ## Iterate over sorted list of peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - var sorted = initHeapQueue[PeerIndex]() - for peerIdx in pool.registry.values(): - if pool.storage[peerIdx.data].peerType in filter: - sorted.push(peerIdx) - while len(sorted) > 0: - let peerIdx = sorted.pop() - yield pool.storage[peerIdx.data].data - -iterator availablePeers*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): A = + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call, PeerPool could become different + ## from the snapshot this iterator provides. + var unsorted: seq[PeerIndex] + for pindex in pool.registry.values(): + let item = addr(pool.storage[pindex]) + if (item[].peerType in filter) and + (isNil(customFilter) or customFilter(item[].data)): + unsorted.add(pindex) + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + for peer in sortedPeers: + yield peer + +iterator availablePeers*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A = ## Iterate over sorted list of available peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - var sorted = initHeapQueue[PeerIndex]() - for peerIdx in pool.registry.values(): - if (PeerFlags.Acquired notin pool.storage[peerIdx.data].flags) and - (pool.storage[peerIdx.data].peerType in filter): - sorted.push(peerIdx) - while len(sorted) > 0: - let peerIdx = sorted.pop() - yield pool.storage[peerIdx.data].data - -iterator acquiredPeers*[A, B](pool: PeerPool[A, B], - filter = {PeerType.Incoming, - PeerType.Outgoing}): A = + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call, PeerPool could become different + ## from the snapshot this iterator provides. + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = + pool.sorted.filterIt( + (PeerFlags.Acquired notin pool.storage[it].flags) and + (pool.storage[it].peerType in filter)). + mapIt(pool.storage[it].data) + + for peer in sortedPeers: + yield peer + +iterator availablePeers*[A, B]( + pool: PeerPool[A, B], + filter: set[PeerType], + customFilter: PeerCustomFilterCallback[A] +): A = + ## Iterate over sorted list of available peers. + ## + ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values + ## will be first. + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call, PeerPool could become different + ## from the snapshot this iterator provides. + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = + pool.sorted.filterIt( + (PeerFlags.Acquired notin pool.storage[it].flags) and + (pool.storage[it].peerType in filter) and + (isNil(customFilter) or customFilter(pool.storage[it].data))). + mapIt(pool.storage[it].data) + + for peer in sortedPeers: + yield peer + +iterator acquiredPeers*[A, B]( + pool: PeerPool[A, B], + filter = {PeerType.Incoming, PeerType.Outgoing} +): A = ## Iterate over sorted list of acquired (non-available) peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - var sorted = initHeapQueue[PeerIndex]() - for peerIdx in pool.registry.values(): - if (PeerFlags.Acquired in pool.storage[peerIdx.data].flags) and - (pool.storage[peerIdx.data].peerType in filter): - sorted.push(peerIdx) - while len(sorted) > 0: - let peerIdx = sorted.pop() - yield pool.storage[peerIdx.data].data - -proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline, raises: [KeyError].} = + ## + ## NOTE: While it safe to use this iterator in combination with await calls, + ## consider that right after `await` call, PeerPool could become different + ## from the snapshot this iterator provides. + var unsorted: seq[PeerIndex] + for pindex in pool.registry.values(): + if (PeerFlags.Acquired in pool.storage[pindex].flags) and + (pool.storage[pindex].peerType in filter): + unsorted.add(pindex) + + # We allocate new sequence here to avoid problems with missing indices when + # await operation could be part of iteration. + let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) + for peer in sortedPeers: + yield peer + +proc `[]`*[A, B]( + pool: PeerPool[A, B], + key: B +): A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - let pindex = pool.registry[key] - pool.storage[pindex.data] + pool.storage[pool.registry[key]].data -proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline, raises: [KeyError].} = +proc `[]`*[A, B]( + pool: var PeerPool[A, B], + key: B +): var A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - let pindex = pool.registry[key] - pool.storage[pindex.data].data + pool.storage[pool.registry[key]].data proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} = ## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``. @@ -710,9 +857,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## not present, default initialization value for type ``A`` is returned ## (e.g. 0 for any integer type). - let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) - if pindex.data >= 0: - pool.storage[pindex.data].data + let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) + if pindex != PeerIndex(-1): + pool.storage[pindex].data else: A() @@ -720,29 +867,32 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B, default: A): A {.inline.} = ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## not present, default value ``default`` is returned. - let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) - if pindex.data >= 0: - pool.storage[pindex.data].data + let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) + if pindex != PeerIndex(-1): + pool.storage[pindex].data else: default proc clear*[A, B](pool: PeerPool[A, B]) = ## Performs PeerPool's ``pool`` storage and counters reset. - pool.incQueue.clear() - pool.outQueue.clear() pool.registry.clear() + + pool.sorted.reset() for i in 0 ..< len(pool.storage): pool.storage[i] = PeerItem[A]() - pool.storage.setLen(0) + pool.empties.reset() + pool.storage.reset() pool.curIncPeersCount = 0 pool.curOutPeersCount = 0 pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = +proc clearSafe*[A, B]( + pool: PeerPool[A, B] +) {.async: (raises: [CancelledError]).} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. - var acquired = newSeq[A]() + var acquired: seq[A] while len(pool.registry) > len(acquired): var peers = await pool.acquire(len(pool.registry) - len(acquired)) for item in peers: diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index bf648e07b1..16579b657e 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2019-2024 Status Research & Development GmbH +# Copyright (c) 2019-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -8,19 +8,17 @@ {.push raises: [].} {.used.} -import std/[random, heapqueue, tables] -import chronos +import std/[random, heapqueue, tables, sequtils, strutils] +import chronos, chronos/unittest2/asynctests import ../beacon_chain/networking/peer_pool import ./testutil -template closureScope(raisesAnnotation: untyped, body: untyped): untyped = - (proc() {.raises: raisesAnnotation} = body)() - type PeerTestID = string PeerTest = object id: PeerTestID weight: int + metadata: uint64 future: Future[void] func getKey(peer: PeerTest): PeerTestID = @@ -29,6 +27,9 @@ func getKey(peer: PeerTest): PeerTestID = func getFuture(peer: PeerTest): Future[void] = peer.future +func getMetadata(peer: PeerTest): uint64 = + peer.metadata + func `<`(a, b: PeerTest): bool = `<`(a.weight, b.weight) @@ -36,6 +37,14 @@ proc init*(t: typedesc[PeerTest], id: string = "", weight: int = 0): PeerTest = PeerTest(id: id, weight: weight, future: newFuture[void]()) +proc init*(t: typedesc[PeerTest], id: string = "", + weight: int = 0, metadata: uint64): PeerTest = + PeerTest(id: id, weight: weight, future: newFuture[void](), + metadata: metadata) + +proc toString(a: openArray[PeerTest]): string = + "[" & a.mapIt(it.getKey()).join(",") & "]" + proc close(peer: PeerTest) = peer.future.complete() @@ -241,7 +250,7 @@ suite "PeerPool testing suite": itemFut23.finished == false itemFut24.finished == false - test "Acquire/Sorting and consistency test": closureScope([CatchableError]): + test "Acquire/Sorting and consistency test": const TestsCount = 1000 MaxNumber = 1_000_000 @@ -310,61 +319,140 @@ suite "PeerPool testing suite": check waitFor(testAcquireRelease()) == TestsCount - test "deletePeer() test": - proc testDeletePeer(): Future[bool] {.async.} = - var pool = newPeerPool[PeerTest, PeerTestID]() - var peer = PeerTest.init("deletePeer") - - ## Delete available peer - doAssert(pool.addPeerNoWait(peer, - PeerType.Incoming) == PeerStatus.Success) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 1) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) - doAssert(pool.deletePeer(peer) == true) - doAssert(pool.len == 0) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + asyncTest "deletePeer() test": + var pool = newPeerPool[PeerTest, PeerTestID]() - ## Delete acquired peer - peer = PeerTest.init("closingPeer") - doAssert(pool.addPeerNoWait(peer, - PeerType.Incoming) == PeerStatus.Success) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 1) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) - var apeer = await pool.acquire() - doAssert(pool.deletePeer(peer) == true) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + ## Delete available peer + block: + let peer = PeerTest.init("deletePeer") + check: + pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + pool.len == 1 + pool.lenAvailable == 1 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 1 + pool.deletePeer(peer) == true + pool.len == 0 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 + + ## Delete acquired peer + block: + let peer = PeerTest.init("closingPeer") + check: + pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + pool.len == 1 + pool.lenAvailable == 1 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 1 + let apeer = await pool.acquire() + check: + pool.deletePeer(peer) == true + pool.len == 1 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 pool.release(apeer) - doAssert(pool.len == 0) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + check: + pool.len == 0 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 + + ## Force delete acquired peer + block: + let peer = PeerTest.init("closingPeer") + check: + pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + pool.len == 1 + pool.lenAvailable == 1 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 1 + let apeer = await pool.acquire() + check: + pool.deletePeer(apeer, true) == true + pool.len == 0 + pool.lenAvailable == 0 + pool.lenAvailable({PeerType.Outgoing}) == 0 + pool.lenAvailable({PeerType.Incoming}) == 0 + + ## Delete single available peer in pool full of peers + block: + for i in 0 ..< 100: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + for i in 100 ..< 200: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success + check: + pool.len == 200 + pool.lenAvailable == 200 + pool.lenAvailable({PeerType.Outgoing}) == 100 + pool.lenAvailable({PeerType.Incoming}) == 100 + for i in 0 ..< 20: + let + index = 90 + i + peerKey = "peer" & $index + dpeer = pool.getOrDefault(peerKey, default(PeerTest)) + check: + pool.deletePeer(dpeer) == true + pool.hasPeer(peerKey) == false + check: + pool.len == 180 + pool.lenAvailable == 180 + pool.lenAvailable({PeerType.Outgoing}) == 90 + pool.lenAvailable({PeerType.Incoming}) == 90 + pool.clear() + + ## Delete single acquired peer in pool full of peers + block: + for i in 0 ..< 100: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + for i in 100 ..< 200: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success + check: + pool.len == 200 + pool.lenAvailable == 200 + pool.lenAvailable({PeerType.Outgoing}) == 100 + pool.lenAvailable({PeerType.Incoming}) == 100 - ## Force delete acquired peer - peer = PeerTest.init("closingPeer") - doAssert(pool.addPeerNoWait(peer, - PeerType.Incoming) == PeerStatus.Success) - doAssert(pool.len == 1) - doAssert(pool.lenAvailable == 1) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) - apeer = await pool.acquire() - doAssert(pool.deletePeer(peer, true) == true) - doAssert(pool.len == 0) - doAssert(pool.lenAvailable == 0) - doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) - doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + for i in 0 ..< 20: + let apeer = await pool.acquire() + check pool.deletePeer(apeer) == true + pool.release(apeer) + check pool.hasPeer(apeer.getKey()) == false - result = true - check waitFor(testDeletePeer()) == true + check: + pool.len == 180 + pool.lenAvailable == 180 + pool.clear() + + ## Force delete single acquired peer in pool full of peers + block: + for i in 0 ..< 100: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success + for i in 100 ..< 200: + let peer = PeerTest.init("peer" & $i) + check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success + check: + pool.len == 200 + pool.lenAvailable == 200 + pool.lenAvailable({PeerType.Outgoing}) == 100 + pool.lenAvailable({PeerType.Incoming}) == 100 + + for i in 0 ..< 20: + let apeer = await pool.acquire() + check: + pool.deletePeer(apeer, true) == true + pool.hasPeer(apeer.getKey()) == false + + check: + pool.len == 180 + pool.lenAvailable == 180 test "Peer lifetime test": proc testPeerLifetime(): Future[bool] {.async.} = @@ -415,7 +503,7 @@ suite "PeerPool testing suite": check waitFor(testPeerLifetime()) == true - test "Safe/Clear test": closureScope([CatchableError]): + test "Safe/Clear test": var pool = newPeerPool[PeerTest, PeerTestID]() var peer1 = PeerTest.init("peer1", 10) var peer2 = PeerTest.init("peer2", 9) @@ -462,7 +550,7 @@ suite "PeerPool testing suite": asyncSpawn testConsumer() check waitFor(testClose()) == true - test "Access peers by key test": closureScope([CatchableError]): + test "Access peers by key test": var pool = newPeerPool[PeerTest, PeerTestID]() var peer1 = PeerTest.init("peer1", 10) var peer2 = PeerTest.init("peer2", 9) @@ -591,6 +679,285 @@ suite "PeerPool testing suite": len(acqui2) == 2 len(acqui3) == 1 + asyncTest "Custom filters test": + var pool = newPeerPool[PeerTest, PeerTestID]() + let + peer1 = PeerTest.init("peer1", 10, 256'u64) + peer2 = PeerTest.init("peer2", 9, 0'u64) + peer3 = PeerTest.init("peer3", 8, 4'u64) + peer4 = PeerTest.init("peer4", 7, 2'u64) + peer5 = PeerTest.init("peer5", 6, 2'u64) + peer6 = PeerTest.init("peer6", 5, 2'u64) + peer7 = PeerTest.init("peer7", 4, 4'u64) + peer8 = PeerTest.init("peer8", 3, 128'u64) + peer9 = PeerTest.init("peer9", 2, 4'u64) + peer10 = PeerTest.init("peer10", 1, 256'u64) + + proc custom1(peer: PeerTest): bool = + true + + proc custom2(peer: PeerTest): bool = + if peer.getMetadata() == 2'u64: + true + else: + false + + proc custom3(peer: PeerTest): bool = + if peer.getMetadata() in [2'u64, 4'u64]: + true + else: + false + + check: + pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer3, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer4, PeerType.Incoming) == PeerStatus.Success + pool.addPeerNoWait(peer5, PeerType.Incoming) == PeerStatus.Success + + pool.addPeerNoWait(peer10, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer7, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer6, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer8, PeerType.Outgoing) == PeerStatus.Success + pool.addPeerNoWait(peer9, PeerType.Outgoing) == PeerStatus.Success + + template checkTotal() = + let + total1 = + pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom1).toSeq() + total2 = + pool.peers({PeerType.Incoming}, custom1).toSeq() + total3 = + pool.peers({PeerType.Outgoing}, custom1).toSeq() + total4 = + pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom2).toSeq() + total5 = + pool.peers({PeerType.Incoming}, custom2).toSeq() + total6 = + pool.peers({PeerType.Outgoing}, custom2).toSeq() + total7 = + pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom3).toSeq() + total8 = + pool.peers({PeerType.Incoming}, custom3).toSeq() + total9 = + pool.peers({PeerType.Outgoing}, custom3).toSeq() + + check: + total1.toString() == + "[peer1,peer2,peer3,peer4,peer5,peer6,peer7,peer8,peer9,peer10]" + total2.toString() == "[peer1,peer2,peer3,peer4,peer5]" + total3.toString() == "[peer6,peer7,peer8,peer9,peer10]" + total4.toString() == "[peer4,peer5,peer6]" + total5.toString() == "[peer4,peer5]" + total6.toString() == "[peer6]" + total7.toString() == "[peer3,peer4,peer5,peer6,peer7,peer9]" + total8.toString() == "[peer3,peer4,peer5]" + total9.toString() == "[peer6,peer7,peer9]" + + checkTotal() + + block: + let + avail1 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom1).toSeq() + avail2 = + pool.availablePeers({PeerType.Incoming}, custom1).toSeq() + avail3 = + pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() + avail4 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom2).toSeq() + avail5 = + pool.availablePeers({PeerType.Incoming}, custom2).toSeq() + avail6 = + pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() + avail7 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom3).toSeq() + avail8 = + pool.availablePeers({PeerType.Incoming}, custom3).toSeq() + avail9 = + pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() + + check: + avail1.toString() == + "[peer1,peer2,peer3,peer4,peer5,peer6,peer7,peer8,peer9,peer10]" + avail2.toString() == "[peer1,peer2,peer3,peer4,peer5]" + avail3.toString() == "[peer6,peer7,peer8,peer9,peer10]" + avail4.toString() == "[peer4,peer5,peer6]" + avail5.toString() == "[peer4,peer5]" + avail6.toString() == "[peer6]" + avail7.toString() == "[peer3,peer4,peer5,peer6,peer7,peer9]" + avail8.toString() == "[peer3,peer4,peer5]" + avail9.toString() == "[peer6,peer7,peer9]" + + let + tpeer1 = await pool.acquire({PeerType.Incoming, PeerType.Outgoing}, + custom1) + tpeer2 = await pool.acquire({PeerType.Incoming}, custom2) + tpeer3 = await pool.acquire({PeerType.Outgoing}, custom2) + tpeer4 = await pool.acquire({PeerType.Incoming}, custom3) + tpeer5 = await pool.acquire({PeerType.Outgoing}, custom3) + + check: + tpeer1.getKey() == "peer1" + tpeer2.getKey() == "peer4" + tpeer3.getKey() == "peer6" + tpeer4.getKey() == "peer3" + tpeer5.getKey() == "peer7" + + checkTotal() + + block: + let + avail1 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom1).toSeq() + avail2 = + pool.availablePeers({PeerType.Incoming}, custom1).toSeq() + avail3 = + pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() + avail4 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom2).toSeq() + avail5 = + pool.availablePeers({PeerType.Incoming}, custom2).toSeq() + avail6 = + pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() + avail7 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom3).toSeq() + avail8 = + pool.availablePeers({PeerType.Incoming}, custom3).toSeq() + avail9 = + pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() + + check: + avail1.toString() == "[peer2,peer5,peer8,peer9,peer10]" + avail2.toString() == "[peer2,peer5]" + avail3.toString() == "[peer8,peer9,peer10]" + + avail4.toString() == "[peer5]" + avail5.toString() == "[peer5]" + avail6.toString() == "[]" + + avail7.toString() == "[peer5,peer9]" + avail8.toString() == "[peer5]" + avail9.toString() == "[peer9]" + + let + tpeer6 = await pool.acquire({PeerType.Incoming, PeerType.Outgoing}, + custom1) + tpeer7 = await pool.acquire({PeerType.Incoming}, custom2) + tpeer8 = await pool.acquire({PeerType.Outgoing}, custom3) + tpeer9 = await pool.acquire({PeerType.Outgoing}, custom1) + tpeer10 = await pool.acquire({PeerType.Outgoing}, custom1) + + check: + tpeer6.getKey() == "peer2" + tpeer7.getKey() == "peer5" + tpeer8.getKey() == "peer9" + tpeer9.getKey() == "peer8" + tpeer10.getKey() == "peer10" + + checkTotal() + + block: + let + avail1 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom1).toSeq() + avail2 = + pool.availablePeers({PeerType.Incoming}, custom1).toSeq() + avail3 = + pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() + avail4 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom2).toSeq() + avail5 = + pool.availablePeers({PeerType.Incoming}, custom2).toSeq() + avail6 = + pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() + avail7 = + pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, + custom3).toSeq() + avail8 = + pool.availablePeers({PeerType.Incoming}, custom3).toSeq() + avail9 = + pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() + + check: + avail1.toString() == "[]" + avail2.toString() == "[]" + avail3.toString() == "[]" + + avail4.toString() == "[]" + avail5.toString() == "[]" + avail6.toString() == "[]" + + avail7.toString() == "[]" + avail8.toString() == "[]" + avail9.toString() == "[]" + + let + fut1 = pool.acquire({PeerType.Incoming}, custom2) + fut2 = pool.acquire({PeerType.Incoming}, custom2) + fut3 = pool.acquire({PeerType.Outgoing}, custom3) + fut4 = pool.acquire({PeerType.Outgoing}, custom3) + + check: + fut1.finished == false + fut2.finished == false + fut3.finished == false + fut4.finished == false + + pool.release(tpeer1) + await sleepAsync(100.milliseconds) + check: + fut1.finished == false + fut2.finished == false + fut3.finished == false + fut4.finished == false + + pool.release(tpeer2) + await sleepAsync(100.milliseconds) + check: + fut1.finished == true + fut1.value.getKey() == "peer4" + fut2.finished == false + fut3.finished == false + fut4.finished == false + + pool.release(tpeer3) + await sleepAsync(100.milliseconds) + check: + fut2.finished == false + fut3.finished == true + fut3.value.getKey() == "peer6" + fut4.finished == false + + pool.release(tpeer5) + await sleepAsync(100.milliseconds) + check: + fut2.finished == false + fut4.finished == true + fut4.value.getKey() == "peer7" + + pool.release(tpeer4) + pool.release(tpeer6) + pool.release(tpeer8) + pool.release(tpeer10) + await sleepAsync(100.milliseconds) + check: + fut2.finished == false + + pool.release(tpeer7) + await sleepAsync(100.milliseconds) + check: + fut2.finished == true + fut2.value.getKey() == "peer5" + test "Score check test": var pool = newPeerPool[PeerTest, PeerTestID]() func scoreCheck(peer: PeerTest): bool = @@ -889,8 +1256,8 @@ suite "PeerPool testing suite": pool7.lenSpace({PeerType.Incoming}) == 0 pool7.lenSpace({PeerType.Outgoing}) == high(int) - 39 - # We could not check whole high(int), so we check 10_000 items - for i in 0 ..< 10_000: + # We could not check whole high(int), so we check 1000 items + for i in 0 ..< 1000: check: pool7.addPeerNoWait(PeerTest.init("idOut" & $i), PeerType.Outgoing) == PeerStatus.Success @@ -914,8 +1281,8 @@ suite "PeerPool testing suite": pool8.lenSpace({PeerType.Outgoing}) == 0 pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - # We could not check whole high(int), so we check 10_000 items - for i in 0 ..< 10_000: + # We could not check whole high(int), so we check 1000 items + for i in 0 ..< 1000: check: pool8.addPeerNoWait(PeerTest.init("idInc" & $i), PeerType.Incoming) == PeerStatus.Success @@ -924,8 +1291,8 @@ suite "PeerPool testing suite": pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - (i + 1) # POOL 9 - # We could not check whole high(int), so we check 10_000 items - for i in 0 ..< 10_000: + # We could not check whole high(int), so we check 1000 items + for i in 0 ..< 1000: check: pool9.addPeerNoWait(PeerTest.init("idInc" & $i), PeerType.Incoming) == PeerStatus.Success