diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index f6e4aec23e..fc4487f37a 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -791,7 +791,6 @@ AllTests-mainnet + Access peers by key test OK + Acquire from empty pool OK + Acquire/Sorting and consistency test OK -+ Custom filters test OK + Delete peer on release text OK + Iterators test OK + Peer lifetime test OK diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 66b03dd651..b9d3d37970 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -480,12 +480,15 @@ func netKbps*(peer: Peer): float {.inline.} = ## Returns current network throughput average value in Kbps for peer ``peer``. round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) -# /!\ Must be exported to be seen by `peerpool`. -func cmp*(a, b: Peer): int = - if a.score == b.score: - cmp(a.netThroughput.average, b.netThroughput.average) +# /!\ Must be exported to be seen by `peerCmp` +func `<`*(a, b: Peer): bool = + ## Comparison function indicating `true` if peer `a` ranks worse than peer `b` + if a.score != b.score: + a.score < b.score + elif a.netThroughput.average != b.netThroughput.average: + a.netThroughput.average < b.netThroughput.average else: - cmp(a.score, b.score) + system.`<`(a, b) const maxRequestQuota = 1000000 diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 6cac3cde4a..611c86e304 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2025 Status Research & Development GmbH +# Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[tables, heapqueue, algorithm, sequtils] +import std/[tables, heapqueue] import chronos export tables @@ -19,6 +19,9 @@ type PeerFlags = enum Acquired, DeleteOnRelease + EventType = enum + NotEmptyEvent, NotFullEvent + PeerStatus* = enum Success, ## Peer was successfully added to PeerPool. DuplicateError, ## Peer is already present in PeerPool. @@ -26,13 +29,15 @@ type LowScoreError, ## Peer has too low score. DeadPeerError ## Peer is already dead. - PeerIndex = int - PeerItem[T] = object data: T peerType: PeerType flags: set[PeerFlags] - index: PeerIndex + index: int + + PeerIndex = object + data: int + cmp: proc(a, b: PeerIndex): bool {.gcsafe, raises: [].} PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [].} @@ -40,14 +45,16 @@ type PeerOnDeleteCallback*[T] = proc(peer: T) {.gcsafe, raises: [].} - PeerCustomFilterCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [].} - PeerPool*[A, B] = ref object - changeEvent: AsyncEvent - storage: seq[PeerItem[A]] + incNotEmptyEvent*: AsyncEvent + outNotEmptyEvent*: AsyncEvent + incNotFullEvent*: AsyncEvent + outNotFullEvent*: AsyncEvent + incQueue: HeapQueue[PeerIndex] + outQueue: HeapQueue[PeerIndex] registry: Table[B, PeerIndex] - sorted: seq[PeerIndex] - empties: seq[PeerIndex] + storage: seq[PeerItem[A]] + cmp: proc(a, b: PeerIndex): bool {.gcsafe, raises: [].} scoreCheck: PeerScoreCheckCallback[A] onDeletePeer: PeerOnDeleteCallback[A] peerCounter: PeerCounterCallback @@ -61,48 +68,91 @@ type PeerPoolError* = object of CatchableError +proc `<`*(a, b: PeerIndex): bool = + ## PeerIndex ``a`` holds reference to ``cmp()`` procedure which has captured + ## PeerPool instance. + a.cmp(b, a) + +proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B], + item: PeerItem[A]) = + case item.peerType: + of PeerType.Incoming: + pool.incNotEmptyEvent.fire() + of PeerType.Outgoing: + pool.outNotEmptyEvent.fire() + +proc fireNotFullEvent[A, B](pool: PeerPool[A, B], + item: PeerItem[A]) = + case item.peerType: + of PeerType.Incoming: + pool.incNotFullEvent.fire() + of PeerType.Outgoing: + pool.outNotFullEvent.fire() + iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) = - for peerId, pindex in pool.registry: - yield (peerId, pool.storage[pindex].data) - -proc resort[A, B]( - pool: PeerPool[A, B], - unsorted: openArray[PeerIndex] -): seq[PeerIndex] = - mixin `cmp` - proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = - cmp(pool.storage[a].data, pool.storage[b].data) - unsorted.sorted(pcmp, order = SortOrder.Descending) - -proc bsearch[A, B]( - pool: PeerPool[A, B], - sorted: openArray[PeerIndex], - index: PeerIndex -): int = - mixin `cmp` - proc pcmp(a, b: PeerIndex): int {.closure, raises: [].} = - cmp(pool.storage[a].data, pool.storage[b].data) - sorted.binarySearch(index, pcmp) - -proc addToStorage[A, B](pool: PeerPool[A, B], item: PeerItem[A]): PeerIndex = - var indexedItem = item - if len(pool.empties) > 0: - indexedItem.index = pool.empties[0] - pool.storage[indexedItem.index] = indexedItem - pool.empties.del(0) - else: - indexedItem.index = PeerIndex(len(pool.storage)) - pool.storage.add(indexedItem) - indexedItem.index - -proc newPeerPool*[A, B]( - maxPeers = -1, - maxIncomingPeers = -1, - maxOutgoingPeers = -1, - scoreCheckCb: PeerScoreCheckCallback[A] = nil, - peerCounterCb: PeerCounterCallback = nil, - onDeleteCb: PeerOnDeleteCallback[A] = nil -): PeerPool[A, B] = + for peerId, peerIdx in pool.registry: + yield (peerId, pool.storage[peerIdx.data].data) + +template incomingEvent(eventType: EventType): AsyncEvent = + case eventType + of EventType.NotEmptyEvent: + pool.incNotEmptyEvent + of EventType.NotFullEvent: + pool.incNotFullEvent + +template outgoingEvent(eventType: EventType): AsyncEvent = + case eventType + of EventType.NotEmptyEvent: + pool.outNotEmptyEvent + of EventType.NotFullEvent: + pool.outNotFullEvent + +proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, + filter: set[PeerType]) {.async: (raises: [CancelledError]).} = + if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: + var fut1 = incomingEvent(eventType).wait() + var fut2 = outgoingEvent(eventType).wait() + try: + try: + discard await one(fut1, fut2) + except ValueError: + raiseAssert "one precondition satisfied" + if fut1.finished(): + if not(fut2.finished()): + await fut2.cancelAndWait() + incomingEvent(eventType).clear() + else: + if not(fut1.finished()): + await fut1.cancelAndWait() + outgoingEvent(eventType).clear() + except CancelledError as exc: + var pending: seq[FutureBase] + if not(fut1.finished()): + pending.add(fut1.cancelAndWait()) + if not(fut2.finished()): + pending.add(fut2.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + elif PeerType.Incoming in filter: + await incomingEvent(eventType).wait() + incomingEvent(eventType).clear() + elif PeerType.Outgoing in filter: + await outgoingEvent(eventType).wait() + outgoingEvent(eventType).clear() + +proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], + filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = + pool.waitForEvent(EventType.NotEmptyEvent, filter) + +proc waitNotFullEvent[A, B](pool: PeerPool[A, B], + filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = + pool.waitForEvent(EventType.NotFullEvent, filter) + +proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, + maxOutgoingPeers = -1, + scoreCheckCb: PeerScoreCheckCallback[A] = nil, + peerCounterCb: PeerCounterCallback = nil, + onDeleteCb: PeerOnDeleteCallback[A] = nil): PeerPool[A, B] = ## Create new PeerPool. ## ## ``maxPeers`` - maximum number of peers allowed. All the peers which @@ -128,35 +178,40 @@ proc newPeerPool*[A, B]( ## ## Please note, that if ``maxPeers`` is positive non-zero value, then equation ## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``. + var res = PeerPool[A, B]() if maxPeers != -1: doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers) - let - maxPeersCount = if maxPeers < 0: high(int) else: maxPeers - maxIncPeersCount = - if maxIncomingPeers < 0: - high(int) - else: - maxIncomingPeers - maxOutPeersCount = - if maxOutgoingPeers < 0: - high(int) - else: - maxOutgoingPeers - res = PeerPool[A, B]( - changeEvent: newAsyncEvent(), - registry: initTable[B, PeerIndex](), - scoreCheck: scoreCheckCb, - peerCounter: peerCounterCb, - onDeletePeer: onDeleteCb, - maxPeersCount: maxPeersCount, - maxIncPeersCount: maxIncPeersCount, - maxOutPeersCount: maxOutPeersCount, - curIncPeersCount: 0, - curOutPeersCount: 0, - acqIncPeersCount: 0, - acqOutPeersCount: 0 - ) + res.maxPeersCount = if maxPeers < 0: high(int) else: maxPeers + res.maxIncPeersCount = + if maxIncomingPeers < 0: + high(int) + else: + maxIncomingPeers + res.maxOutPeersCount = + if maxOutgoingPeers < 0: + high(int) + else: + maxOutgoingPeers + + res.incNotEmptyEvent = newAsyncEvent() + res.outNotEmptyEvent = newAsyncEvent() + res.incNotFullEvent = newAsyncEvent() + res.outNotFullEvent = newAsyncEvent() + res.incQueue = initHeapQueue[PeerIndex]() + res.outQueue = initHeapQueue[PeerIndex]() + res.registry = initTable[B, PeerIndex]() + res.scoreCheck = scoreCheckCb + res.peerCounter = peerCounterCb + res.onDeletePeer = onDeleteCb + res.storage = newSeq[PeerItem[A]]() + + proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} = + let p1 = res.storage[a.data].data + let p2 = res.storage[b.data].data + p1 < p2 + + res.cmp = peerCmp res proc len*[A, B](pool: PeerPool[A, B]): int = @@ -166,86 +221,50 @@ proc len*[A, B](pool: PeerPool[A, B]): int = proc lenCurrent*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): int = + PeerType.Outgoing}): int {.inline.} = ## Returns number of registered peers in PeerPool ``pool`` which satisfies ## filter ``filter``. (if PeerType.Incoming in filter: pool.curIncPeersCount else: 0) + (if PeerType.Outgoing in filter: pool.curOutPeersCount else: 0) -proc lenAvailable*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): int = - ## Returns number of peers available for acquisition in PeerPool - ## ``pool`` which satisfies filter ``filter``. - (if PeerType.Incoming in filter: - pool.curIncPeersCount - pool.acqIncPeersCount - else: - 0) + - (if PeerType.Outgoing in filter: - pool.curOutPeersCount - pool.acqOutPeersCount - else: - 0) - -proc lenAvailable*[A, B]( - pool: PeerPool[A, B], - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): int = - ## Returns number of peers available for acquisition in PeerPool - ## ``pool`` which satisfies filter ``filter`` and custom filter - ## ``customFilter``. - ## Note: This is O(n) operation. - let available = pool.lenAvailable(filter) - var res = 0 - for pindex in pool.sorted.items(): - let item = addr(pool.storage[pindex]) - if (PeerFlags.Acquired notin item[].flags) and - (item[].peerType in filter) and - (isNil(customFilter) or customFilter(item[].data)): - inc(res) - if res == available: - # Number of customly filtered items could not be higher than number of - # peers of specific directions. - break - res +proc lenAvailable*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): int {.inline.} = + ## Returns number of available peers in PeerPool ``pool`` which satisfies + ## filter ``filter``. + (if PeerType.Incoming in filter: len(pool.incQueue) else: 0) + + (if PeerType.Outgoing in filter: len(pool.outQueue) else: 0) -proc lenAcquired*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): int = +proc lenAcquired*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): int {.inline.} = ## Returns number of acquired peers in PeerPool ``pool`` which satisifies ## filter ``filter``. (if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) + (if PeerType.Outgoing in filter: pool.acqOutPeersCount else: 0) -proc lenSpace*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): int = +proc lenSpace*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): int {.inline.} = ## Returns number of available space for peers in PeerPool ``pool`` which ## satisfies filter ``filter``. - let - curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount - spaceAvailable = pool.maxPeersCount - curPeersCount - incoming = min(spaceAvailable, - pool.maxIncPeersCount - pool.curIncPeersCount) - outgoing = min(spaceAvailable, - pool.maxOutPeersCount - pool.curOutPeersCount) + let curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount + let totalSpace = pool.maxPeersCount - curPeersCount + let incoming = min(totalSpace, pool.maxIncPeersCount - pool.curIncPeersCount) + let outgoing = min(totalSpace, pool.maxOutPeersCount - pool.curOutPeersCount) if filter == {PeerType.Incoming, PeerType.Outgoing}: # To avoid overflow check we need to check by ourself. if uint64(incoming) + uint64(outgoing) > uint64(high(int)): - min(spaceAvailable, high(int)) + min(totalSpace, high(int)) else: - min(spaceAvailable, incoming + outgoing) + min(totalSpace, incoming + outgoing) elif PeerType.Incoming in filter: incoming else: outgoing proc shortLogAvailable*[A, B](pool: PeerPool[A, B]): string = - $pool.lenAvailable({PeerType.Incoming}) & "/" & - $pool.lenAvailable({PeerType.Outgoing}) + $len(pool.incQueue) & "/" & $len(pool.outQueue) proc shortLogAcquired*[A, B](pool: PeerPool[A, B]): string = $pool.acqIncPeersCount & "/" & $pool.acqOutPeersCount @@ -274,26 +293,6 @@ proc peerDeleted[A, B](pool: PeerPool[A, B], peer: A) = if not(isNil(pool.onDeletePeer)): pool.onDeletePeer(peer) -proc deletePeerImpl[A, B]( - pool: PeerPool[A, B], - peer: A, - key: B, - pindex: PeerIndex -) = - let sindex = pool.bsearch(pool.sorted, pindex) - pool.storage[pindex] = PeerItem[A](index: PeerIndex(-1)) - pool.empties.add(pindex) - pool.registry.del(key) - if sindex >= 0: - # sindex == 0 when deleting peer which was acquired (not in `sorted` array). - pool.sorted = pool.resort(pool.sorted) - pool.sorted.del(sindex) - - # Indicate that we have an empty space - pool.changeEvent.fire() - pool.peerDeleted(peer) - pool.peerCountChanged() - proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## Remove ``peer`` from PeerPool ``pool``. ## @@ -301,60 +300,78 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## be deleted only when peer will be released. You can change this behavior ## with ``force`` option. mixin getKey - let - key = peer.getKey() - pindex = - block: - let res = pool.registry.getOrDefault(key, PeerIndex(-1)) - if res == PeerIndex(-1): - return false - res - - var item = addr(pool.storage[pindex]) - if (PeerFlags.Acquired in item[].flags): - if not(force): - item[].flags.incl(PeerFlags.DeleteOnRelease) + let key = getKey(peer) + if pool.registry.hasKey(key): + let pindex = try: pool.registry[key].data + except KeyError: raiseAssert "checked with hasKey" + var item = addr(pool.storage[pindex]) + if (PeerFlags.Acquired in item[].flags): + if not(force): + item[].flags.incl(PeerFlags.DeleteOnRelease) + else: + if item[].peerType == PeerType.Incoming: + dec(pool.curIncPeersCount) + dec(pool.acqIncPeersCount) + elif item[].peerType == PeerType.Outgoing: + dec(pool.curOutPeersCount) + dec(pool.acqOutPeersCount) + + # Indicate that we have an empty space + pool.fireNotFullEvent(item[]) + # Cleanup storage with default item, and removing key from hashtable. + pool.storage[pindex] = PeerItem[A]() + pool.registry.del(key) + pool.peerDeleted(peer) + pool.peerCountChanged() else: - case item[].peerType - of PeerType.Incoming: - dec(pool.acqIncPeersCount) + if item[].peerType == PeerType.Incoming: + # If peer is available, then its copy present in heapqueue, so we need + # to remove it. + for i in 0 ..< len(pool.incQueue): + if pool.incQueue[i].data == pindex: + pool.incQueue.del(i) + break dec(pool.curIncPeersCount) - of PeerType.Outgoing: - dec(pool.acqOutPeersCount) + elif item[].peerType == PeerType.Outgoing: + # If peer is available, then its copy present in heapqueue, so we need + # to remove it. + for i in 0 ..< len(pool.outQueue): + if pool.outQueue[i].data == pindex: + pool.outQueue.del(i) + break dec(pool.curOutPeersCount) - pool.deletePeerImpl(peer, key, pindex) - else: - case item[].peerType - of PeerType.Incoming: - dec(pool.curIncPeersCount) - of PeerType.Outgoing: - dec(pool.curOutPeersCount) - pool.deletePeerImpl(peer, key, pindex) - true + # Indicate that we have an empty space + pool.fireNotFullEvent(item[]) + # Cleanup storage with default item, and removing key from hashtable. + pool.storage[pindex] = PeerItem[A]() + pool.registry.del(key) + pool.peerDeleted(peer) + pool.peerCountChanged() + true + else: + false proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, peerType: PeerType) = - mixin getFuture proc onPeerClosed(udata: pointer) {.gcsafe, raises: [].} = discard pool.deletePeer(peer) - let - item = PeerItem[A](data: peer, peerType: peerType) - pindex = pool.addToStorage(item) - pitem = addr(pool.storage[pindex]) - + let item = PeerItem[A](data: peer, peerType: peerType, + index: len(pool.storage)) + pool.storage.add(item) + var pitem = addr(pool.storage[^1]) + let pindex = PeerIndex(data: item.index, cmp: pool.cmp) pool.registry[peerKey] = pindex - pool.sorted.add(pindex) - pool.sorted = pool.resort(pool.sorted) - pitem[].data.getFuture().addCallback(onPeerClosed) - case peerType - of PeerType.Incoming: + if peerType == PeerType.Incoming: inc(pool.curIncPeersCount) - of PeerType.Outgoing: + pool.incQueue.push(pindex) + pool.incNotEmptyEvent.fire() + elif peerType == PeerType.Outgoing: inc(pool.curOutPeersCount) - pool.changeEvent.fire() + pool.outQueue.push(pindex) + pool.outNotEmptyEvent.fire() pool.peerCountChanged() proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = @@ -378,11 +395,8 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = else: PeerStatus.DuplicateError -proc addPeerNoWait*[A, B]( - pool: PeerPool[A, B], - peer: A, - peerType: PeerType -): PeerStatus = +proc addPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A, peerType: PeerType): PeerStatus = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## Procedure returns ``PeerStatus`` @@ -414,21 +428,41 @@ proc addPeerNoWait*[A, B]( else: PeerStatus.NoSpaceError -proc waitForEmptySpace*[A, B]( - pool: PeerPool[A, B], - peerType: PeerType -) {.async: (raises: [CancelledError]).} = +proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], + peerType: PeerType): set[PeerType] {.inline.} = + ## This procedure returns set of events which you need to wait to get empty + ## space for peer type ``peerType``. This set can be used for call to + ## ``waitNotFullEvent()``. + case peerType: + of PeerType.Incoming: + if pool.maxIncPeersCount >= pool.maxPeersCount: + # If maximum number of `incoming` peers is only limited by + # maximum number of peers, then we could wait for both events. + # It means that we do not care about what peer will left pool. + {PeerType.Incoming, PeerType.Outgoing} + else: + # Otherwise we could wait only for `incoming` event + {PeerType.Incoming} + of PeerType.Outgoing: + if pool.maxOutPeersCount >= pool.maxPeersCount: + # If maximum number of `outgoing` peers is only limited by + # maximum number of peers, then we could wait for both events. + # It means that we do not care about what peer will left pool. + {PeerType.Incoming, PeerType.Outgoing} + else: + # Otherwise we could wait only for `outgoing` event + {PeerType.Outgoing} + +proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], + peerType: PeerType) {.async: (raises: [CancelledError]).} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. + let mask = pool.getPeerSpaceMask(peerType) while pool.lenSpace({peerType}) == 0: - await pool.changeEvent.wait() - pool.changeEvent.clear() + await pool.waitNotFullEvent(mask) -proc addPeer*[A, B]( - pool: PeerPool[A, B], - peer: A, - peerType: PeerType -): Future[PeerStatus] {.async: (raises: [CancelledError]).} = +proc addPeer*[A, B](pool: PeerPool[A, B], + peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -440,176 +474,134 @@ proc addPeer*[A, B]( ## ## Procedure returns (PeerStatus.Success) on success. mixin getKey - - template check(peer: untyped) = - let res = pool.checkPeer(peer) - if res != PeerStatus.Success: - return res - - while pool.lenSpace({peerType}) == 0: - peer.check() - await pool.changeEvent.wait() - pool.changeEvent.clear() - - # Because we could wait for a long time we need to check peer one more - # time to avoid race condition. - peer.check() - - pool.addPeerImpl(peer, peer.getKey(), peerType) - PeerStatus.Success - -proc acquireItemImpl[A, B]( - pool: PeerPool[A, B], - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] = nil -): A = - let (sindex, pitem) = + let res = block: - var - rindex: PeerIndex = PeerIndex(-1) - res: ptr PeerItem[A] = nil - for sindex, pindex in pool.sorted.pairs(): - res = addr(pool.storage[pindex]) - if (PeerFlags.Acquired notin res[].flags) and - (res[].peerType in filter) and - (isNil(customFilter) or customFilter(res[].data)): - rindex = PeerIndex(sindex) - break - (rindex, res) - - doAssert(not(isNil(pitem))) + let res1 = pool.checkPeer(peer) + if res1 != PeerStatus.Success: + res1 + else: + let mask = pool.getPeerSpaceMask(peerType) + # We going to block here until ``pool`` will not have free space, + # for our type of peer. + while pool.lenSpace({peerType}) == 0: + await pool.waitNotFullEvent(mask) + # Because we could wait for a long time we need to check peer one more + # time to avoid race condition. + let res2 = pool.checkPeer(peer) + if res2 == PeerStatus.Success: + let peerKey = peer.getKey() + pool.addPeerImpl(peer, peerKey, peerType) + PeerStatus.Success + else: + res2 + return res + +proc acquireItemImpl[A, B](pool: PeerPool[A, B], + filter: set[PeerType]): A {.inline.} = + doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0)) + let pindex = + if filter == {PeerType.Incoming, PeerType.Outgoing}: + if len(pool.outQueue) > 0 and len(pool.incQueue) > 0: + # `<` here is the `PeerIndex` implementation (`HeapQueue` uses `<`), + # which then flips the arguments to rank `>` on `A` using `pool.cmp` + if pool.incQueue[0] < pool.outQueue[0]: + inc(pool.acqIncPeersCount) + let item = pool.incQueue.pop() + item.data + else: + inc(pool.acqOutPeersCount) + let item = pool.outQueue.pop() + item.data + else: + if len(pool.outQueue) > 0: + inc(pool.acqOutPeersCount) + let item = pool.outQueue.pop() + item.data + else: + inc(pool.acqIncPeersCount) + let item = pool.incQueue.pop() + item.data + else: + if PeerType.Outgoing in filter: + inc(pool.acqOutPeersCount) + let item = pool.outQueue.pop() + item.data + else: + inc(pool.acqIncPeersCount) + let item = pool.incQueue.pop() + item.data + var pitem = addr(pool.storage[pindex]) doAssert(PeerFlags.Acquired notin pitem[].flags) - - case pitem[].peerType - of PeerType.Incoming: - inc(pool.acqIncPeersCount) - of PeerType.Outgoing: - inc(pool.acqOutPeersCount) - - pool.sorted.del(sindex) - pool.sorted = pool.resort(pool.sorted) - pitem[].flags.incl(PeerFlags.Acquired) pitem[].data -proc acquire*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): Future[A] {.async: (raises: [CancelledError]).} = +proc acquire*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. - ## This procedure will wait for peer which satisfy filter will become - ## available for acquisition. mixin getKey doAssert(filter != {}, "Filter must not be empty") while true: if pool.lenAvailable(filter) == 0: - await pool.changeEvent.wait() - pool.changeEvent.clear() + await pool.waitNotEmptyEvent(filter) else: - return pool.acquireItemImpl(filter, nil) - -proc acquire*[A, B]( - pool: PeerPool[A, B], - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): Future[A] {.async: (raises: [CancelledError]).} = - ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` and - ## custom filter ``customFilter``. This procedure will wait for peer which - ## satisfy filters will become available for acquisition. - mixin getKey - doAssert(filter != {}, "Filter must not be empty") - while true: - if pool.lenAvailable(filter, customFilter) == 0: - await pool.changeEvent.wait() - pool.changeEvent.clear() - else: - return pool.acquireItemImpl(filter, customFilter) - -proc acquireNoWait*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): A {.raises: [PeerPoolError].} = - ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` - ## without waiting, this procedure will raise PeerPoolError if no peers - ## which satisfy filters are available for acquisition. + return pool.acquireItemImpl(filter) + +proc acquireNoWait*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing} + ): A {.raises: [PeerPoolError].} = doAssert(filter != {}, "Filter must not be empty") if pool.lenAvailable(filter) < 1: raise newException(PeerPoolError, "Not enough peers in pool") - pool.acquireItemImpl(filter, nil) - -proc acquireNoWait*[A, B]( - pool: PeerPool[A, B], - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): A {.raises: [PeerPoolError].} = - ## Acquire peer from PeerPool ``pool``, which match the filter ``filter`` and - ## custom filter ``customFilter`` without waiting, this procedure will raise - ## PeerPoolError if no peers which satisfy filters are available for - ## acquisition. - doAssert(filter != {}, "Filter must not be empty") - if pool.lenAvailable(filter, customFilter) < 1: - raise newException(PeerPoolError, "Not enough peers in pool") - pool.acquireItemImpl(filter, customFilter) + pool.acquireItemImpl(filter) proc release*[A, B](pool: PeerPool[A, B], peer: A) = ## Release peer ``peer`` back to PeerPool ``pool`` mixin getKey - let - key = peer.getKey() - pindex = - block: - let res = pool.registry.getOrDefault(key, PeerIndex(-1)) - if res == PeerIndex(-1): - return - res - item = addr(pool.storage[pindex]) - - if PeerFlags.Acquired in item[].flags: - if not(pool.checkPeerScore(peer)): - item[].flags.incl(DeleteOnRelease) - if PeerFlags.DeleteOnRelease in item[].flags: - case item[].peerType - of PeerType.Incoming: - dec(pool.acqIncPeersCount) - dec(pool.curIncPeersCount) - of PeerType.Outgoing: - dec(pool.acqOutPeersCount) - dec(pool.curOutPeersCount) - pool.deletePeerImpl(peer, key, pindex) - else: - item[].flags.excl(PeerFlags.Acquired) - case item[].peerType - of PeerType.Incoming: - dec(pool.acqIncPeersCount) - of PeerType.Outgoing: - dec(pool.acqOutPeersCount) - - pool.sorted.add(pindex) - pool.sorted = pool.resort(pool.sorted) - pool.changeEvent.fire() - -proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) = + let key = getKey(peer) + var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1)) + if titem.data >= 0: + let pindex = titem.data + var item = addr(pool.storage[pindex]) + if PeerFlags.Acquired in item[].flags: + if not(pool.checkPeerScore(peer)): + item[].flags.incl(DeleteOnRelease) + if PeerFlags.DeleteOnRelease in item[].flags: + # We do not care about result here because peer is present in registry + # and has all proper flags set. + discard pool.deletePeer(peer, force = true) + else: + item[].flags.excl(PeerFlags.Acquired) + case item[].peerType + of PeerType.Incoming: + pool.incQueue.push(titem) + dec(pool.acqIncPeersCount) + of PeerType.Outgoing: + pool.outQueue.push(titem) + dec(pool.acqOutPeersCount) + pool.fireNotEmptyEvent(item[]) + +proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = ## Release array of peers ``peers`` back to PeerPool ``pool``. for item in peers: pool.release(item) -proc acquire*[A, B]( - pool: PeerPool[A, B], - number: int, - filter = {PeerType.Incoming, PeerType.Outgoing} -): Future[seq[A]] {.async: (raises: [CancelledError]).} = +proc acquire*[A, B](pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, + PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") - var peers: seq[A] + var peers = newSeq[A]() try: if number > 0: while true: if len(peers) >= number: break if pool.lenAvailable(filter) == 0: - await pool.changeEvent.wait() - pool.changeEvent.clear() + await pool.waitNotEmptyEvent(filter) else: peers.add(pool.acquireItemImpl(filter)) except CancelledError as exc: @@ -619,235 +611,96 @@ proc acquire*[A, B]( pool.release(item) peers.setLen(0) raise exc - peers - -proc acquire*[A, B]( - pool: PeerPool[A, B], - number: int, - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): Future[seq[A]] {.async: (raises: [CancelledError]).} = - ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the - ## filter ``filter`` and custom filter ``customFilter``. This procedure will - ## wait for ``number`` of peers which satisfy filter will become available - ## and acquired. - doAssert(filter != {}, "Filter must not be empty") - var peers: seq[A] - try: - if number > 0: - while true: - if len(peers) >= number: - break - if pool.lenAvailable(filter, customFilter) == 0: - await pool.changeEvent.wait() - pool.changeEvent.clear() - else: - peers.add(pool.acquireItemImpl(filter, customFilter)) - except CancelledError as exc: - # If we got cancelled, we need to return all the acquired peers back to - # pool. - for item in peers: - pool.release(item) - peers.setLen(0) - raise exc - peers + return peers -proc acquireNoWait*[A, B]( - pool: PeerPool[A, B], - number: int, - filter = {PeerType.Incoming, PeerType.Outgoing} -): seq[A] = +proc acquireNoWait*[A, B](pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, + PeerType.Outgoing}): seq[A] = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the - ## filter ``filter``. This procedure does not wait for peers, it will raise - ## `PeerPoolError` if peers matching the filters are not available. + ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") - var peers: seq[A] + var peers = newSeq[A]() if pool.lenAvailable(filter) < number: raise newException(PeerPoolError, "Not enough peers in pool") for i in 0 ..< number: peers.add(pool.acquireItemImpl(filter)) - peers - -proc acquireNoWait*[A, B]( - pool: PeerPool[A, B], - number: int, - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): seq[A] = - ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the - ## filter ``filter`` and custom filter ``filter``. This procedure does not - ## wait for peers, it will raise `PeerPoolError` if peers matching the - ## filters are not available. - doAssert(filter != {}, "Filter must not be empty") - var peers: seq[A] - if pool.lenAvailable(filter, customFilter) < number: - raise newException(PeerPoolError, "Not enough peers in pool") - for i in 0 ..< number: - peers.add(pool.acquireItemImpl(filter, customFilter)) - peers + return peers -proc acquireIncomingPeer*[A, B]( - pool: PeerPool[A, B] -): Future[A] {.async: (raises: [CancelledError], raw: true).} = +proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = ## Acquire single incoming peer from PeerPool ``pool``. pool.acquire({PeerType.Incoming}) -proc acquireOutgoingPeer*[A, B]( - pool: PeerPool[A, B] -): Future[A] {.async: (raises: [CancelledError], raw: true).} = +proc acquireOutgoingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = ## Acquire single outgoing peer from PeerPool ``pool``. pool.acquire({PeerType.Outgoing}) -proc acquireIncomingPeers*[A, B]( - pool: PeerPool[A, B], - number: int -): Future[seq[A]] {.async: (raises: [CancelledError], raw: true).} = +proc acquireIncomingPeers*[A, B](pool: PeerPool[A, B], + number: int): Future[seq[A]] {.inline.} = ## Acquire ``number`` number of incoming peers from PeerPool ``pool``. pool.acquire(number, {PeerType.Incoming}) -proc acquireOutgoingPeers*[A, B]( - pool: PeerPool[A, B], - number: int -): Future[seq[A]] {.async: (raises: [CancelledError], raw: true).} = +proc acquireOutgoingPeers*[A, B](pool: PeerPool[A, B], + number: int): Future[seq[A]] {.inline.} = ## Acquire ``number`` number of outgoing peers from PeerPool ``pool``. pool.acquire(number, {PeerType.Outgoing}) -iterator peers*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): A = - ## Iterate over sorted list of peers. - ## - ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values - ## will be first. - ## - ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call, PeerPool could become different - ## from the snapshot this iterator provides. - var unsorted: seq[PeerIndex] - for pindex in pool.registry.values(): - if pool.storage[pindex].peerType in filter: - unsorted.add(pindex) - - # We allocate new sequence here to avoid problems with missing indices when - # await operation could be part of iteration. - let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) - for peer in sortedPeers: - yield peer - -iterator peers*[A, B]( - pool: PeerPool[A, B], - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): A = +iterator peers*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): A = ## Iterate over sorted list of peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - ## - ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call, PeerPool could become different - ## from the snapshot this iterator provides. - var unsorted: seq[PeerIndex] - for pindex in pool.registry.values(): - let item = addr(pool.storage[pindex]) - if (item[].peerType in filter) and - (isNil(customFilter) or customFilter(item[].data)): - unsorted.add(pindex) - - # We allocate new sequence here to avoid problems with missing indices when - # await operation could be part of iteration. - let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) - for peer in sortedPeers: - yield peer - -iterator availablePeers*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): A = + var sorted = initHeapQueue[PeerIndex]() + for peerIdx in pool.registry.values(): + if pool.storage[peerIdx.data].peerType in filter: + sorted.push(peerIdx) + while len(sorted) > 0: + let peerIdx = sorted.pop() + yield pool.storage[peerIdx.data].data + +iterator availablePeers*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): A = ## Iterate over sorted list of available peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - ## - ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call, PeerPool could become different - ## from the snapshot this iterator provides. - - # We allocate new sequence here to avoid problems with missing indices when - # await operation could be part of iteration. - let sortedPeers = - pool.sorted.filterIt( - (PeerFlags.Acquired notin pool.storage[it].flags) and - (pool.storage[it].peerType in filter)). - mapIt(pool.storage[it].data) - - for peer in sortedPeers: - yield peer - -iterator availablePeers*[A, B]( - pool: PeerPool[A, B], - filter: set[PeerType], - customFilter: PeerCustomFilterCallback[A] -): A = - ## Iterate over sorted list of available peers. - ## - ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values - ## will be first. - ## - ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call, PeerPool could become different - ## from the snapshot this iterator provides. - - # We allocate new sequence here to avoid problems with missing indices when - # await operation could be part of iteration. - let sortedPeers = - pool.sorted.filterIt( - (PeerFlags.Acquired notin pool.storage[it].flags) and - (pool.storage[it].peerType in filter) and - (isNil(customFilter) or customFilter(pool.storage[it].data))). - mapIt(pool.storage[it].data) - - for peer in sortedPeers: - yield peer - -iterator acquiredPeers*[A, B]( - pool: PeerPool[A, B], - filter = {PeerType.Incoming, PeerType.Outgoing} -): A = + var sorted = initHeapQueue[PeerIndex]() + for peerIdx in pool.registry.values(): + if (PeerFlags.Acquired notin pool.storage[peerIdx.data].flags) and + (pool.storage[peerIdx.data].peerType in filter): + sorted.push(peerIdx) + while len(sorted) > 0: + let peerIdx = sorted.pop() + yield pool.storage[peerIdx.data].data + +iterator acquiredPeers*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): A = ## Iterate over sorted list of acquired (non-available) peers. ## ## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values ## will be first. - ## - ## NOTE: While it safe to use this iterator in combination with await calls, - ## consider that right after `await` call, PeerPool could become different - ## from the snapshot this iterator provides. - var unsorted: seq[PeerIndex] - for pindex in pool.registry.values(): - if (PeerFlags.Acquired in pool.storage[pindex].flags) and - (pool.storage[pindex].peerType in filter): - unsorted.add(pindex) - - # We allocate new sequence here to avoid problems with missing indices when - # await operation could be part of iteration. - let sortedPeers = pool.resort(unsorted).mapIt(pool.storage[it].data) - for peer in sortedPeers: - yield peer - -proc `[]`*[A, B]( - pool: PeerPool[A, B], - key: B -): A {.inline, raises: [KeyError].} = + var sorted = initHeapQueue[PeerIndex]() + for peerIdx in pool.registry.values(): + if (PeerFlags.Acquired in pool.storage[peerIdx.data].flags) and + (pool.storage[peerIdx.data].peerType in filter): + sorted.push(peerIdx) + while len(sorted) > 0: + let peerIdx = sorted.pop() + yield pool.storage[peerIdx.data].data + +proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - pool.storage[pool.registry[key]].data + let pindex = pool.registry[key] + pool.storage[pindex.data] -proc `[]`*[A, B]( - pool: var PeerPool[A, B], - key: B -): var A {.inline, raises: [KeyError].} = +proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline, raises: [KeyError].} = ## Retrieve peer with key ``key`` from PeerPool ``pool``. - pool.storage[pool.registry[key]].data + let pindex = pool.registry[key] + pool.storage[pindex.data].data proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} = ## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``. @@ -857,9 +710,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## not present, default initialization value for type ``A`` is returned ## (e.g. 0 for any integer type). - let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) - if pindex != PeerIndex(-1): - pool.storage[pindex].data + let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) + if pindex.data >= 0: + pool.storage[pindex.data].data else: A() @@ -867,32 +720,29 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B, default: A): A {.inline.} = ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## not present, default value ``default`` is returned. - let pindex = pool.registry.getOrDefault(key, PeerIndex(-1)) - if pindex != PeerIndex(-1): - pool.storage[pindex].data + let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) + if pindex.data >= 0: + pool.storage[pindex.data].data else: default proc clear*[A, B](pool: PeerPool[A, B]) = ## Performs PeerPool's ``pool`` storage and counters reset. + pool.incQueue.clear() + pool.outQueue.clear() pool.registry.clear() - - pool.sorted.reset() for i in 0 ..< len(pool.storage): pool.storage[i] = PeerItem[A]() - pool.empties.reset() - pool.storage.reset() + pool.storage.setLen(0) pool.curIncPeersCount = 0 pool.curOutPeersCount = 0 pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B]( - pool: PeerPool[A, B] -) {.async: (raises: [CancelledError]).} = +proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. - var acquired: seq[A] + var acquired = newSeq[A]() while len(pool.registry) > len(acquired): var peers = await pool.acquire(len(pool.registry) - len(acquired)) for item in peers: diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 16579b657e..bf648e07b1 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2019-2025 Status Research & Development GmbH +# Copyright (c) 2019-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -8,17 +8,19 @@ {.push raises: [].} {.used.} -import std/[random, heapqueue, tables, sequtils, strutils] -import chronos, chronos/unittest2/asynctests +import std/[random, heapqueue, tables] +import chronos import ../beacon_chain/networking/peer_pool import ./testutil +template closureScope(raisesAnnotation: untyped, body: untyped): untyped = + (proc() {.raises: raisesAnnotation} = body)() + type PeerTestID = string PeerTest = object id: PeerTestID weight: int - metadata: uint64 future: Future[void] func getKey(peer: PeerTest): PeerTestID = @@ -27,9 +29,6 @@ func getKey(peer: PeerTest): PeerTestID = func getFuture(peer: PeerTest): Future[void] = peer.future -func getMetadata(peer: PeerTest): uint64 = - peer.metadata - func `<`(a, b: PeerTest): bool = `<`(a.weight, b.weight) @@ -37,14 +36,6 @@ proc init*(t: typedesc[PeerTest], id: string = "", weight: int = 0): PeerTest = PeerTest(id: id, weight: weight, future: newFuture[void]()) -proc init*(t: typedesc[PeerTest], id: string = "", - weight: int = 0, metadata: uint64): PeerTest = - PeerTest(id: id, weight: weight, future: newFuture[void](), - metadata: metadata) - -proc toString(a: openArray[PeerTest]): string = - "[" & a.mapIt(it.getKey()).join(",") & "]" - proc close(peer: PeerTest) = peer.future.complete() @@ -250,7 +241,7 @@ suite "PeerPool testing suite": itemFut23.finished == false itemFut24.finished == false - test "Acquire/Sorting and consistency test": + test "Acquire/Sorting and consistency test": closureScope([CatchableError]): const TestsCount = 1000 MaxNumber = 1_000_000 @@ -319,140 +310,61 @@ suite "PeerPool testing suite": check waitFor(testAcquireRelease()) == TestsCount - asyncTest "deletePeer() test": - var pool = newPeerPool[PeerTest, PeerTestID]() - - ## Delete available peer - block: - let peer = PeerTest.init("deletePeer") - check: - pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success - pool.len == 1 - pool.lenAvailable == 1 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 1 - pool.deletePeer(peer) == true - pool.len == 0 - pool.lenAvailable == 0 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 0 - - ## Delete acquired peer - block: - let peer = PeerTest.init("closingPeer") - check: - pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success - pool.len == 1 - pool.lenAvailable == 1 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 1 - let apeer = await pool.acquire() - check: - pool.deletePeer(peer) == true - pool.len == 1 - pool.lenAvailable == 0 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 0 - pool.release(apeer) - check: - pool.len == 0 - pool.lenAvailable == 0 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 0 - - ## Force delete acquired peer - block: - let peer = PeerTest.init("closingPeer") - check: - pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success - pool.len == 1 - pool.lenAvailable == 1 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 1 - let apeer = await pool.acquire() - check: - pool.deletePeer(apeer, true) == true - pool.len == 0 - pool.lenAvailable == 0 - pool.lenAvailable({PeerType.Outgoing}) == 0 - pool.lenAvailable({PeerType.Incoming}) == 0 - - ## Delete single available peer in pool full of peers - block: - for i in 0 ..< 100: - let peer = PeerTest.init("peer" & $i) - check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success - for i in 100 ..< 200: - let peer = PeerTest.init("peer" & $i) - check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success - check: - pool.len == 200 - pool.lenAvailable == 200 - pool.lenAvailable({PeerType.Outgoing}) == 100 - pool.lenAvailable({PeerType.Incoming}) == 100 - for i in 0 ..< 20: - let - index = 90 + i - peerKey = "peer" & $index - dpeer = pool.getOrDefault(peerKey, default(PeerTest)) - check: - pool.deletePeer(dpeer) == true - pool.hasPeer(peerKey) == false - check: - pool.len == 180 - pool.lenAvailable == 180 - pool.lenAvailable({PeerType.Outgoing}) == 90 - pool.lenAvailable({PeerType.Incoming}) == 90 - pool.clear() - - ## Delete single acquired peer in pool full of peers - block: - for i in 0 ..< 100: - let peer = PeerTest.init("peer" & $i) - check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success - for i in 100 ..< 200: - let peer = PeerTest.init("peer" & $i) - check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success - check: - pool.len == 200 - pool.lenAvailable == 200 - pool.lenAvailable({PeerType.Outgoing}) == 100 - pool.lenAvailable({PeerType.Incoming}) == 100 + test "deletePeer() test": + proc testDeletePeer(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID]() + var peer = PeerTest.init("deletePeer") - for i in 0 ..< 20: - let apeer = await pool.acquire() - check pool.deletePeer(apeer) == true - pool.release(apeer) - check pool.hasPeer(apeer.getKey()) == false + ## Delete available peer + doAssert(pool.addPeerNoWait(peer, + PeerType.Incoming) == PeerStatus.Success) + doAssert(pool.len == 1) + doAssert(pool.lenAvailable == 1) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) + doAssert(pool.deletePeer(peer) == true) + doAssert(pool.len == 0) + doAssert(pool.lenAvailable == 0) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) - check: - pool.len == 180 - pool.lenAvailable == 180 - pool.clear() - - ## Force delete single acquired peer in pool full of peers - block: - for i in 0 ..< 100: - let peer = PeerTest.init("peer" & $i) - check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success - for i in 100 ..< 200: - let peer = PeerTest.init("peer" & $i) - check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success - check: - pool.len == 200 - pool.lenAvailable == 200 - pool.lenAvailable({PeerType.Outgoing}) == 100 - pool.lenAvailable({PeerType.Incoming}) == 100 + ## Delete acquired peer + peer = PeerTest.init("closingPeer") + doAssert(pool.addPeerNoWait(peer, + PeerType.Incoming) == PeerStatus.Success) + doAssert(pool.len == 1) + doAssert(pool.lenAvailable == 1) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) + var apeer = await pool.acquire() + doAssert(pool.deletePeer(peer) == true) + doAssert(pool.len == 1) + doAssert(pool.lenAvailable == 0) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) + pool.release(apeer) + doAssert(pool.len == 0) + doAssert(pool.lenAvailable == 0) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) - for i in 0 ..< 20: - let apeer = await pool.acquire() - check: - pool.deletePeer(apeer, true) == true - pool.hasPeer(apeer.getKey()) == false + ## Force delete acquired peer + peer = PeerTest.init("closingPeer") + doAssert(pool.addPeerNoWait(peer, + PeerType.Incoming) == PeerStatus.Success) + doAssert(pool.len == 1) + doAssert(pool.lenAvailable == 1) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 1) + apeer = await pool.acquire() + doAssert(pool.deletePeer(peer, true) == true) + doAssert(pool.len == 0) + doAssert(pool.lenAvailable == 0) + doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) + doAssert(pool.lenAvailable({PeerType.Incoming}) == 0) - check: - pool.len == 180 - pool.lenAvailable == 180 + result = true + check waitFor(testDeletePeer()) == true test "Peer lifetime test": proc testPeerLifetime(): Future[bool] {.async.} = @@ -503,7 +415,7 @@ suite "PeerPool testing suite": check waitFor(testPeerLifetime()) == true - test "Safe/Clear test": + test "Safe/Clear test": closureScope([CatchableError]): var pool = newPeerPool[PeerTest, PeerTestID]() var peer1 = PeerTest.init("peer1", 10) var peer2 = PeerTest.init("peer2", 9) @@ -550,7 +462,7 @@ suite "PeerPool testing suite": asyncSpawn testConsumer() check waitFor(testClose()) == true - test "Access peers by key test": + test "Access peers by key test": closureScope([CatchableError]): var pool = newPeerPool[PeerTest, PeerTestID]() var peer1 = PeerTest.init("peer1", 10) var peer2 = PeerTest.init("peer2", 9) @@ -679,285 +591,6 @@ suite "PeerPool testing suite": len(acqui2) == 2 len(acqui3) == 1 - asyncTest "Custom filters test": - var pool = newPeerPool[PeerTest, PeerTestID]() - let - peer1 = PeerTest.init("peer1", 10, 256'u64) - peer2 = PeerTest.init("peer2", 9, 0'u64) - peer3 = PeerTest.init("peer3", 8, 4'u64) - peer4 = PeerTest.init("peer4", 7, 2'u64) - peer5 = PeerTest.init("peer5", 6, 2'u64) - peer6 = PeerTest.init("peer6", 5, 2'u64) - peer7 = PeerTest.init("peer7", 4, 4'u64) - peer8 = PeerTest.init("peer8", 3, 128'u64) - peer9 = PeerTest.init("peer9", 2, 4'u64) - peer10 = PeerTest.init("peer10", 1, 256'u64) - - proc custom1(peer: PeerTest): bool = - true - - proc custom2(peer: PeerTest): bool = - if peer.getMetadata() == 2'u64: - true - else: - false - - proc custom3(peer: PeerTest): bool = - if peer.getMetadata() in [2'u64, 4'u64]: - true - else: - false - - check: - pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success - pool.addPeerNoWait(peer3, PeerType.Incoming) == PeerStatus.Success - pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success - pool.addPeerNoWait(peer4, PeerType.Incoming) == PeerStatus.Success - pool.addPeerNoWait(peer5, PeerType.Incoming) == PeerStatus.Success - - pool.addPeerNoWait(peer10, PeerType.Outgoing) == PeerStatus.Success - pool.addPeerNoWait(peer7, PeerType.Outgoing) == PeerStatus.Success - pool.addPeerNoWait(peer6, PeerType.Outgoing) == PeerStatus.Success - pool.addPeerNoWait(peer8, PeerType.Outgoing) == PeerStatus.Success - pool.addPeerNoWait(peer9, PeerType.Outgoing) == PeerStatus.Success - - template checkTotal() = - let - total1 = - pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom1).toSeq() - total2 = - pool.peers({PeerType.Incoming}, custom1).toSeq() - total3 = - pool.peers({PeerType.Outgoing}, custom1).toSeq() - total4 = - pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom2).toSeq() - total5 = - pool.peers({PeerType.Incoming}, custom2).toSeq() - total6 = - pool.peers({PeerType.Outgoing}, custom2).toSeq() - total7 = - pool.peers({PeerType.Incoming, PeerType.Outgoing}, custom3).toSeq() - total8 = - pool.peers({PeerType.Incoming}, custom3).toSeq() - total9 = - pool.peers({PeerType.Outgoing}, custom3).toSeq() - - check: - total1.toString() == - "[peer1,peer2,peer3,peer4,peer5,peer6,peer7,peer8,peer9,peer10]" - total2.toString() == "[peer1,peer2,peer3,peer4,peer5]" - total3.toString() == "[peer6,peer7,peer8,peer9,peer10]" - total4.toString() == "[peer4,peer5,peer6]" - total5.toString() == "[peer4,peer5]" - total6.toString() == "[peer6]" - total7.toString() == "[peer3,peer4,peer5,peer6,peer7,peer9]" - total8.toString() == "[peer3,peer4,peer5]" - total9.toString() == "[peer6,peer7,peer9]" - - checkTotal() - - block: - let - avail1 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom1).toSeq() - avail2 = - pool.availablePeers({PeerType.Incoming}, custom1).toSeq() - avail3 = - pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() - avail4 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom2).toSeq() - avail5 = - pool.availablePeers({PeerType.Incoming}, custom2).toSeq() - avail6 = - pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() - avail7 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom3).toSeq() - avail8 = - pool.availablePeers({PeerType.Incoming}, custom3).toSeq() - avail9 = - pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() - - check: - avail1.toString() == - "[peer1,peer2,peer3,peer4,peer5,peer6,peer7,peer8,peer9,peer10]" - avail2.toString() == "[peer1,peer2,peer3,peer4,peer5]" - avail3.toString() == "[peer6,peer7,peer8,peer9,peer10]" - avail4.toString() == "[peer4,peer5,peer6]" - avail5.toString() == "[peer4,peer5]" - avail6.toString() == "[peer6]" - avail7.toString() == "[peer3,peer4,peer5,peer6,peer7,peer9]" - avail8.toString() == "[peer3,peer4,peer5]" - avail9.toString() == "[peer6,peer7,peer9]" - - let - tpeer1 = await pool.acquire({PeerType.Incoming, PeerType.Outgoing}, - custom1) - tpeer2 = await pool.acquire({PeerType.Incoming}, custom2) - tpeer3 = await pool.acquire({PeerType.Outgoing}, custom2) - tpeer4 = await pool.acquire({PeerType.Incoming}, custom3) - tpeer5 = await pool.acquire({PeerType.Outgoing}, custom3) - - check: - tpeer1.getKey() == "peer1" - tpeer2.getKey() == "peer4" - tpeer3.getKey() == "peer6" - tpeer4.getKey() == "peer3" - tpeer5.getKey() == "peer7" - - checkTotal() - - block: - let - avail1 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom1).toSeq() - avail2 = - pool.availablePeers({PeerType.Incoming}, custom1).toSeq() - avail3 = - pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() - avail4 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom2).toSeq() - avail5 = - pool.availablePeers({PeerType.Incoming}, custom2).toSeq() - avail6 = - pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() - avail7 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom3).toSeq() - avail8 = - pool.availablePeers({PeerType.Incoming}, custom3).toSeq() - avail9 = - pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() - - check: - avail1.toString() == "[peer2,peer5,peer8,peer9,peer10]" - avail2.toString() == "[peer2,peer5]" - avail3.toString() == "[peer8,peer9,peer10]" - - avail4.toString() == "[peer5]" - avail5.toString() == "[peer5]" - avail6.toString() == "[]" - - avail7.toString() == "[peer5,peer9]" - avail8.toString() == "[peer5]" - avail9.toString() == "[peer9]" - - let - tpeer6 = await pool.acquire({PeerType.Incoming, PeerType.Outgoing}, - custom1) - tpeer7 = await pool.acquire({PeerType.Incoming}, custom2) - tpeer8 = await pool.acquire({PeerType.Outgoing}, custom3) - tpeer9 = await pool.acquire({PeerType.Outgoing}, custom1) - tpeer10 = await pool.acquire({PeerType.Outgoing}, custom1) - - check: - tpeer6.getKey() == "peer2" - tpeer7.getKey() == "peer5" - tpeer8.getKey() == "peer9" - tpeer9.getKey() == "peer8" - tpeer10.getKey() == "peer10" - - checkTotal() - - block: - let - avail1 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom1).toSeq() - avail2 = - pool.availablePeers({PeerType.Incoming}, custom1).toSeq() - avail3 = - pool.availablePeers({PeerType.Outgoing}, custom1).toSeq() - avail4 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom2).toSeq() - avail5 = - pool.availablePeers({PeerType.Incoming}, custom2).toSeq() - avail6 = - pool.availablePeers({PeerType.Outgoing}, custom2).toSeq() - avail7 = - pool.availablePeers({PeerType.Incoming, PeerType.Outgoing}, - custom3).toSeq() - avail8 = - pool.availablePeers({PeerType.Incoming}, custom3).toSeq() - avail9 = - pool.availablePeers({PeerType.Outgoing}, custom3).toSeq() - - check: - avail1.toString() == "[]" - avail2.toString() == "[]" - avail3.toString() == "[]" - - avail4.toString() == "[]" - avail5.toString() == "[]" - avail6.toString() == "[]" - - avail7.toString() == "[]" - avail8.toString() == "[]" - avail9.toString() == "[]" - - let - fut1 = pool.acquire({PeerType.Incoming}, custom2) - fut2 = pool.acquire({PeerType.Incoming}, custom2) - fut3 = pool.acquire({PeerType.Outgoing}, custom3) - fut4 = pool.acquire({PeerType.Outgoing}, custom3) - - check: - fut1.finished == false - fut2.finished == false - fut3.finished == false - fut4.finished == false - - pool.release(tpeer1) - await sleepAsync(100.milliseconds) - check: - fut1.finished == false - fut2.finished == false - fut3.finished == false - fut4.finished == false - - pool.release(tpeer2) - await sleepAsync(100.milliseconds) - check: - fut1.finished == true - fut1.value.getKey() == "peer4" - fut2.finished == false - fut3.finished == false - fut4.finished == false - - pool.release(tpeer3) - await sleepAsync(100.milliseconds) - check: - fut2.finished == false - fut3.finished == true - fut3.value.getKey() == "peer6" - fut4.finished == false - - pool.release(tpeer5) - await sleepAsync(100.milliseconds) - check: - fut2.finished == false - fut4.finished == true - fut4.value.getKey() == "peer7" - - pool.release(tpeer4) - pool.release(tpeer6) - pool.release(tpeer8) - pool.release(tpeer10) - await sleepAsync(100.milliseconds) - check: - fut2.finished == false - - pool.release(tpeer7) - await sleepAsync(100.milliseconds) - check: - fut2.finished == true - fut2.value.getKey() == "peer5" - test "Score check test": var pool = newPeerPool[PeerTest, PeerTestID]() func scoreCheck(peer: PeerTest): bool = @@ -1256,8 +889,8 @@ suite "PeerPool testing suite": pool7.lenSpace({PeerType.Incoming}) == 0 pool7.lenSpace({PeerType.Outgoing}) == high(int) - 39 - # We could not check whole high(int), so we check 1000 items - for i in 0 ..< 1000: + # We could not check whole high(int), so we check 10_000 items + for i in 0 ..< 10_000: check: pool7.addPeerNoWait(PeerTest.init("idOut" & $i), PeerType.Outgoing) == PeerStatus.Success @@ -1281,8 +914,8 @@ suite "PeerPool testing suite": pool8.lenSpace({PeerType.Outgoing}) == 0 pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - # We could not check whole high(int), so we check 1000 items - for i in 0 ..< 1000: + # We could not check whole high(int), so we check 10_000 items + for i in 0 ..< 10_000: check: pool8.addPeerNoWait(PeerTest.init("idInc" & $i), PeerType.Incoming) == PeerStatus.Success @@ -1291,8 +924,8 @@ suite "PeerPool testing suite": pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - (i + 1) # POOL 9 - # We could not check whole high(int), so we check 1000 items - for i in 0 ..< 1000: + # We could not check whole high(int), so we check 10_000 items + for i in 0 ..< 10_000: check: pool9.addPeerNoWait(PeerTest.init("idInc" & $i), PeerType.Incoming) == PeerStatus.Success