Skip to content
4 changes: 2 additions & 2 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ type
Eth2NetworkingError = object
case kind*: Eth2NetworkingErrorKind
of ReceivedErrorResponse:
responseCode: ResponseCode
errorMsg: string
responseCode*: ResponseCode
errorMsg*: string
else:
discard

Expand Down
13 changes: 12 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ proc initFullNode(
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(eip4844.BlobsSidecar), resfut)
resfut
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
Comment thread
henridf marked this conversation as resolved.
Future[Result[void, VerifierError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool,
Expand Down Expand Up @@ -377,7 +387,8 @@ proc initFullNode(
node.processor = processor
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = RequestManager.init(node.network, blockVerifier)
node.requestManager = RequestManager.init(node.network, dag.cfg, getBeaconTime,
blockVerifier, blockBlobsVerifier)
node.syncManager = syncManager
node.backfiller = backfiller
node.router = router
Expand Down
128 changes: 125 additions & 3 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
"."/sync_protocol, "."/sync_manager
from ../beacon_clock import GetBeaconTimeFn
export block_quarantine, sync_manager

logScope:
Expand All @@ -31,11 +32,17 @@ type
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
BlockBlobsVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}

RequestManager* = object
network*: Eth2Node
inpQueue*: AsyncQueue[FetchRecord]
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn
blockVerifier: BlockVerifier
blockBlobsVerifier: BlockBlobsVerifier
loopFuture: Future[void]

func shortLog*(x: seq[Eth2Digest]): string =
Expand All @@ -45,13 +52,33 @@ func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"

proc init*(T: type RequestManager, network: Eth2Node,
blockVerifier: BlockVerifier): RequestManager =
cfg: RuntimeConfig,
getBeaconTime: GetBeaconTimeFn,
blockVerifier: BlockVerifier,
blockBlobsVerifier: BlockBlobsVerifier): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
blockVerifier: blockVerifier
cfg: cfg,
getBeaconTime: getBeaconTime,
blockVerifier: blockVerifier,
blockBlobsVerifier: blockBlobsVerifier,
)

proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref SignedBeaconBlockAndBlobsSidecar]): bool =
## This procedure checks peer's response.
var checks = @roots
if len(blocks) > len(roots):
return false
for item in blocks:
Comment thread
henridf marked this conversation as resolved.
let res = checks.find(item[].beacon_block.root)
if res == -1:
return false
else:
checks.del(res)
true

proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
## This procedure checks peer's response.
Expand Down Expand Up @@ -138,6 +165,92 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
if not(isNil(peer)):
rman.network.peerPool.release(peer)

proc fetchAncestorBlocksAndBlobsFromNetwork(rman: RequestManager,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This has a lot of duplication with fetchAncestorBlocksFromNetwork, but there isn't a shared chunk that can be cleanly factored out.)

items: seq[Eth2Digest]) {.async.} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
debug "Requesting blocks by root", peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()

let blocks = (await beaconBlockAndBlobsSidecarByRoot_v1(peer, BlockRootsList items))

if blocks.isOk:
let ublocks = blocks.get()
if checkResponse(items, ublocks.asSeq()):
var
gotGoodBlock = false
gotUnviableBlock = false

for b in ublocks:
let ver = await rman.blockBlobsVerifier(ForkedSignedBeaconBlock.init(b[].beacon_block), b[].blobs_sidecar)
if ver.isErr():
case ver.error()
of VerifierError.MissingParent:
# Ignoring because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
discard
of VerifierError.Duplicate:
# Ignoring because these errors could occur due to the
# concurrent/parallel requests we made.
discard
of VerifierError.UnviableFork:
# If they're working a different fork, we'll want to descore them
# but also process the other blocks (in case we can register the
# other blocks as unviable)
gotUnviableBlock = true
of VerifierError.Invalid:
# We stop processing blocks because peer is either sending us
# junk or working a different fork
notice "Received invalid block",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreBadValues)

return # Stop processing this junk...
else:
gotGoodBlock = true

if gotUnviableBlock:
notice "Received blocks from an unviable fork",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreUnviableFork)
elif gotGoodBlock:
# We reward peer only if it returns something.
peer.updateScore(PeerScoreGoodValues)
else:
let err = blocks.error()
case err.kind
of ReceivedErrorResponse:
if err.responseCode == ResourceUnavailable:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
await rman.fetchAncestorBlocksFromNetwork(items)
return
else:
peer.updateScore(PeerScoreBadResponse)
else:
discard
else:
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching ancestor blocks", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)


proc isBlobsTime(rman: RequestManager): bool =
rman.getBeaconTime().slotOrZero.epoch >= rman.cfg.EIP4844_FORK_EPOCH

proc requestManagerLoop(rman: RequestManager) {.async.} =
var rootList = newSeq[Eth2Digest]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
Expand All @@ -154,8 +267,17 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =

let start = SyncMoment.now(0)

# As soon as EIP4844_FORK_EPOCH comes around in wall time, we
# switch to requesting blocks and blobs. In the vicinity of the
# transition, that means that we *may* request blobs for a
# pre-eip4844. In that case, we get ResourceUnavailable from the
# peer and fall back to requesting blocks only.
let getBlobs = rman.isBlobsTime()
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList)
workers[i] = if getBlobs:
rman.fetchAncestorBlocksAndBlobsFromNetwork(rootList)
else:
rman.fetchAncestorBlocksFromNetwork(rootList)

# We do not care about
await allFutures(workers)
Expand Down