Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ proc initFullNode(
{}
syncManager = newSyncManager[Peer, PeerId](
node.network.peerPool,
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
dag.cfg.DENEB_FORK_EPOCH,
dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
dag.cfg.MAX_BLOBS_PER_BLOCK_ELECTRA,
SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
getFrontfillSlot, isWithinWeakSubjectivityPeriod,
Expand All @@ -516,7 +518,9 @@ proc initFullNode(
flags = syncManagerFlags)
backfiller = newSyncManager[Peer, PeerId](
node.network.peerPool,
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
dag.cfg.DENEB_FORK_EPOCH,
dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
dag.cfg.MAX_BLOBS_PER_BLOCK_ELECTRA,
SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
getFrontfillSlot, isWithinWeakSubjectivityPeriod,
Expand All @@ -530,7 +534,9 @@ proc initFullNode(
getLocalWallSlot()
untrustedManager = newSyncManager[Peer, PeerId](
node.network.peerPool,
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
dag.cfg.DENEB_FORK_EPOCH,
dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
dag.cfg.MAX_BLOBS_PER_BLOCK_ELECTRA,
SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getUntrustedBackfillSlot,
getFrontfillSlot, isWithinWeakSubjectivityPeriod,
Expand Down
16 changes: 12 additions & 4 deletions beacon_chain/sync/sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type
pool: PeerPool[A, B]
DENEB_FORK_EPOCH: Epoch
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: uint64
MAX_BLOBS_PER_BLOCK_ELECTRA: uint64
responseTimeout: chronos.Duration
maxHeadAge: uint64
isWithinWeakSubjectivityPeriod: GetBoolCallback
Expand Down Expand Up @@ -142,6 +143,7 @@ proc newSyncManager*[A, B](
pool: PeerPool[A, B],
denebEpoch: Epoch,
minEpochsForBlobSidecarsRequests: uint64,
maxBlobsPerBlockElectra: uint64,
direction: SyncQueueKind,
getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback,
Expand Down Expand Up @@ -170,6 +172,7 @@ proc newSyncManager*[A, B](
pool: pool,
DENEB_FORK_EPOCH: denebEpoch,
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: minEpochsForBlobSidecarsRequests,
MAX_BLOBS_PER_BLOCK_ELECTRA: maxBlobsPerBlockElectra,
getLocalHeadSlot: getLocalHeadSlotCb,
getLocalWallSlot: getLocalWallSlotCb,
isWithinWeakSubjectivityPeriod: weakSubjectivityPeriodCb,
Expand Down Expand Up @@ -230,7 +233,9 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
sync_ident = man.ident,
topics = "syncman"

blobSidecarsByRange(peer, req.data.slot, req.data.count)
blobSidecarsByRange(
peer, req.data.slot, req.data.count,
maxResponseItems = (req.data.count * man.MAX_BLOBS_PER_BLOCK_ELECTRA).Limit)

proc remainingSlots(man: SyncManager): uint64 =
let
Expand Down Expand Up @@ -293,7 +298,8 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =

proc getSyncBlockData*[T](
peer: T,
slot: Slot
slot: Slot,
maxBlobsPerBlockElectra: uint64
): Future[SyncBlockDataRes] {.async: (raises: [CancelledError]).} =
mixin getScore

Expand Down Expand Up @@ -353,7 +359,8 @@ proc getSyncBlockData*[T](
peer_score = peer.getScore(),
peer_speed = peer.netKbps(),
topics = "syncman"
let res = await blobSidecarsByRange(peer, slot, 1'u64)
let res = await blobSidecarsByRange(
peer, slot, 1'u64, maxResponseItems = maxBlobsPerBlockElectra.Limit)
if res.isErr():
peer.updateScore(PeerScoreNoValues)
return err(
Expand Down Expand Up @@ -449,7 +456,8 @@ proc getSyncBlockData[A, B](

if len(blobData) > 0:
let blobSlots = mapIt(blobData, it[].signed_block_header.message.slot)
checkBlobsResponse(sr, blobSlots).isOkOr:
checkBlobsResponse(
sr, blobSlots, man.MAX_BLOBS_PER_BLOCK_ELECTRA).isOkOr:
peer.updateScore(PeerScoreBadResponse)
return err("Incorrect blobs sequence received, reason: " & $error)

Expand Down
3 changes: 2 additions & 1 deletion beacon_chain/sync/sync_overseer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ proc getPeerBlock(
let peer = await overseer.pool.acquire()
try:
let
res = (await getSyncBlockData(peer, slot)).valueOr:
maxBlobs = overseer.consensusManager.dag.cfg.MAX_BLOBS_PER_BLOCK_ELECTRA
res = (await getSyncBlockData(peer, slot, maxBlobs)).valueOr:
return err(error)
blob =
if res.blobs.isSome():
Expand Down
10 changes: 6 additions & 4 deletions beacon_chain/sync/sync_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -968,13 +968,15 @@ proc checkResponse*[T](req: SyncRequest[T],

ok()

proc checkBlobsResponse*[T](req: SyncRequest[T],
data: openArray[Slot]): Result[void, cstring] =
proc checkBlobsResponse*[T](
req: SyncRequest[T],
data: openArray[Slot],
maxBlobsPerBlockElectra: uint64): Result[void, cstring] =
if len(data) == 0:
# Impossible to verify empty response.
return ok()

if lenu64(data) > (req.data.count * MAX_BLOBS_PER_BLOCK_ELECTRA):
if lenu64(data) > (req.data.count * maxBlobsPerBlockElectra):
# Number of blobs in response should be less or equal to number of
# requested (blocks * MAX_BLOBS_PER_BLOCK_ELECTRA).
# NOTE: This is not strict check, proper check will be done in blobs
Expand All @@ -991,7 +993,7 @@ proc checkBlobsResponse*[T](req: SyncRequest[T],
return err("Incorrect order")
if slot == pslot:
inc(counter)
if counter > MAX_BLOBS_PER_BLOCK_ELECTRA:
if counter > maxBlobsPerBlockElectra:
# NOTE: This is not strict check, proper check will be done in blobs
# validation.
return err("Number of blobs in the block exceeds the limit")
Expand Down
13 changes: 10 additions & 3 deletions tests/test_sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1363,14 +1363,21 @@ suite "SyncManager test suite":
checkResponse(r3, @[Slot(13), Slot(13)]).isOk() == false

test "[SyncQueue] checkBlobsResponse() test":
const maxBlobsPerBlockElectra = 9

proc checkBlobsResponse[T](
req: SyncRequest[T],
data: openArray[Slot]): Result[void, cstring] =
checkBlobsResponse(req, data, maxBlobsPerBlockElectra)

let
r1 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 1'u64))
r2 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 2'u64))
r3 = SyncRequest[SomeTPeer](data: SyncRange.init(Slot(11), 3'u64))

d1 = Slot(11).repeat(MAX_BLOBS_PER_BLOCK_ELECTRA)
d2 = Slot(12).repeat(MAX_BLOBS_PER_BLOCK_ELECTRA)
d3 = Slot(13).repeat(MAX_BLOBS_PER_BLOCK_ELECTRA)
d1 = Slot(11).repeat(maxBlobsPerBlockElectra)
d2 = Slot(12).repeat(maxBlobsPerBlockElectra)
d3 = Slot(13).repeat(maxBlobsPerBlockElectra)

check:
checkBlobsResponse(r1, [Slot(11)]).isOk() == true
Expand Down