diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 98ac1fa45e..0b20f784f5 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -204,8 +204,8 @@ type Eth2NetworkingError = object case kind*: Eth2NetworkingErrorKind of ReceivedErrorResponse: - responseCode: ResponseCode - errorMsg: string + responseCode*: ResponseCode + errorMsg*: string else: discard diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 05ebb76540..c4a343d06e 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -331,6 +331,16 @@ proc initFullNode( blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(eip4844.BlobsSidecar), resfut) resfut + blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar): + Future[Result[void, VerifierError]] = + # The design with a callback for block verification is unusual compared + # to the rest of the application, but fits with the general approach + # taken in the sync/request managers - this is an architectural compromise + # that should probably be reimagined more holistically in the future. + let resfut = newFuture[Result[void, VerifierError]]("blockVerifier") + blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.some(blobs), resfut) + resfut processor = Eth2Processor.new( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, @@ -377,7 +387,8 @@ proc initFullNode( node.processor = processor node.blockProcessor = blockProcessor node.consensusManager = consensusManager - node.requestManager = RequestManager.init(node.network, blockVerifier) + node.requestManager = RequestManager.init(node.network, dag.cfg, getBeaconTime, + blockVerifier, blockBlobsVerifier) node.syncManager = syncManager node.backfiller = backfiller node.router = router diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 46034ed225..2d60823533 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -15,6 +15,7 @@ import ../networking/eth2_network, ../consensus_object_pools/block_quarantine, "."/sync_protocol, "."/sync_manager +from ../beacon_clock import GetBeaconTimeFn export block_quarantine, sync_manager logScope: @@ -31,11 +32,17 @@ type BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock): Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} + BlockBlobsVerifier* = + proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar): + Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} RequestManager* = object network*: Eth2Node inpQueue*: AsyncQueue[FetchRecord] + cfg: RuntimeConfig + getBeaconTime: GetBeaconTimeFn blockVerifier: BlockVerifier + blockBlobsVerifier: BlockBlobsVerifier loopFuture: Future[void] func shortLog*(x: seq[Eth2Digest]): string = @@ -45,13 +52,33 @@ func shortLog*(x: seq[FetchRecord]): string = "[" & x.mapIt(shortLog(it.root)).join(", ") & "]" proc init*(T: type RequestManager, network: Eth2Node, - blockVerifier: BlockVerifier): RequestManager = + cfg: RuntimeConfig, + getBeaconTime: GetBeaconTimeFn, + blockVerifier: BlockVerifier, + blockBlobsVerifier: BlockBlobsVerifier): RequestManager = RequestManager( network: network, inpQueue: newAsyncQueue[FetchRecord](), - blockVerifier: blockVerifier + cfg: cfg, + getBeaconTime: getBeaconTime, + blockVerifier: blockVerifier, + blockBlobsVerifier: blockBlobsVerifier, ) +proc checkResponse(roots: openArray[Eth2Digest], + blocks: openArray[ref SignedBeaconBlockAndBlobsSidecar]): bool = + ## This procedure checks peer's response. + var checks = @roots + if len(blocks) > len(roots): + return false + for item in blocks: + let res = checks.find(item[].beacon_block.root) + if res == -1: + return false + else: + checks.del(res) + true + proc checkResponse(roots: openArray[Eth2Digest], blocks: openArray[ref ForkedSignedBeaconBlock]): bool = ## This procedure checks peer's response. @@ -138,6 +165,92 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, if not(isNil(peer)): rman.network.peerPool.release(peer) +proc fetchAncestorBlocksAndBlobsFromNetwork(rman: RequestManager, + items: seq[Eth2Digest]) {.async.} = + var peer: Peer + try: + peer = await rman.network.peerPool.acquire() + debug "Requesting blocks by root", peer = peer, blocks = shortLog(items), + peer_score = peer.getScore() + + let blocks = (await beaconBlockAndBlobsSidecarByRoot_v1(peer, BlockRootsList items)) + + if blocks.isOk: + let ublocks = blocks.get() + if checkResponse(items, ublocks.asSeq()): + var + gotGoodBlock = false + gotUnviableBlock = false + + for b in ublocks: + let ver = await rman.blockBlobsVerifier(ForkedSignedBeaconBlock.init(b[].beacon_block), b[].blobs_sidecar) + if ver.isErr(): + case ver.error() + of VerifierError.MissingParent: + # Ignoring because the order of the blocks that + # we requested may be different from the order in which we need + # these blocks to apply. + discard + of VerifierError.Duplicate: + # Ignoring because these errors could occur due to the + # concurrent/parallel requests we made. + discard + of VerifierError.UnviableFork: + # If they're working a different fork, we'll want to descore them + # but also process the other blocks (in case we can register the + # other blocks as unviable) + gotUnviableBlock = true + of VerifierError.Invalid: + # We stop processing blocks because peer is either sending us + # junk or working a different fork + notice "Received invalid block", + peer = peer, blocks = shortLog(items), + peer_score = peer.getScore() + peer.updateScore(PeerScoreBadValues) + + return # Stop processing this junk... + else: + gotGoodBlock = true + + if gotUnviableBlock: + notice "Received blocks from an unviable fork", + peer = peer, blocks = shortLog(items), + peer_score = peer.getScore() + peer.updateScore(PeerScoreUnviableFork) + elif gotGoodBlock: + # We reward peer only if it returns something. + peer.updateScore(PeerScoreGoodValues) + else: + let err = blocks.error() + case err.kind + of ReceivedErrorResponse: + if err.responseCode == ResourceUnavailable: + if not(isNil(peer)): + rman.network.peerPool.release(peer) + await rman.fetchAncestorBlocksFromNetwork(items) + return + else: + peer.updateScore(PeerScoreBadResponse) + else: + discard + else: + peer.updateScore(PeerScoreNoValues) + + except CancelledError as exc: + raise exc + except CatchableError as exc: + peer.updateScore(PeerScoreNoValues) + debug "Error while fetching ancestor blocks", exc = exc.msg, + items = shortLog(items), peer = peer, peer_score = peer.getScore() + raise exc + finally: + if not(isNil(peer)): + rman.network.peerPool.release(peer) + + +proc isBlobsTime(rman: RequestManager): bool = + rman.getBeaconTime().slotOrZero.epoch >= rman.cfg.EIP4844_FORK_EPOCH + proc requestManagerLoop(rman: RequestManager) {.async.} = var rootList = newSeq[Eth2Digest]() var workers = newSeq[Future[void]](PARALLEL_REQUESTS) @@ -154,8 +267,17 @@ proc requestManagerLoop(rman: RequestManager) {.async.} = let start = SyncMoment.now(0) + # As soon as EIP4844_FORK_EPOCH comes around in wall time, we + # switch to requesting blocks and blobs. In the vicinity of the + # transition, that means that we *may* request blobs for a + # pre-eip4844. In that case, we get ResourceUnavailable from the + # peer and fall back to requesting blocks only. + let getBlobs = rman.isBlobsTime() for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList) + workers[i] = if getBlobs: + rman.fetchAncestorBlocksAndBlobsFromNetwork(rootList) + else: + rman.fetchAncestorBlocksFromNetwork(rootList) # We do not care about await allFutures(workers)