Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions beacon_chain/spec/datatypes/fulu.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type

type
DataColumn* = List[KzgCell, Limit(MAX_BLOB_COMMITMENTS_PER_BLOCK)]
ColumnIndices* = List[ColumnIndex, Limit(NUMBER_OF_COLUMNS)]

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.3/specs/fulu/das-core.md#datacolumnsidecar
DataColumnSidecar* = object
Expand All @@ -113,6 +114,10 @@ type
block_root*: Eth2Digest
index*: ColumnIndex

DataColumnsByRootIdentifier* = object
block_root*: Eth2Digest
indices*: ColumnIndices

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#matrixentry
MatrixEntry* = object
cell*: Cell
Expand Down Expand Up @@ -615,6 +620,15 @@ func shortLog*(v: seq[DataColumnSidecar]): auto =
func shortLog*(x: seq[DataColumnIdentifier]): string =
"[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]"

func shortLog*(xs: seq[DataColumnsByRootIdentifier]): string =
## Formats like: [abcd…/0,2,4, ef09…/1,3]
"[" &
xs.mapIt(
shortLog(it.block_root) & "/" &
it.indices.mapIt($it).join(",")
).join(", ") &
"]"

func shortLog*(x: seq[ColumnIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"

Expand Down
109 changes: 52 additions & 57 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{.push raises: [].}

import chronos, chronicles
import ssz_serialization/types
import
../spec/[forks, network, peerdas_helpers],
../networking/eth2_network,
Expand Down Expand Up @@ -57,7 +58,7 @@ type
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}

DataColumnLoaderFn = proc(
columnId: DataColumnIdentifier):
columnIds: DataColumnIdentifier):
Opt[ref DataColumnSidecar] {.gcsafe, raises: [].}

InhibitFn = proc: bool {.gcsafe, raises: [].}
Expand Down Expand Up @@ -130,6 +131,9 @@ func cmpSidecarIdentifier(x: BlobIdentifier | DataColumnIdentifier,
y: ref BlobSidecar | ref DataColumnSidecar): int =
cmp(x.index, y[].index)

func cmpColumnIndex(x: ColumnIndex, y: ref DataColumnSidecar): int =
cmp(x, y[].index)

func checkResponseSanity(idList: seq[BlobIdentifier],
blobs: openArray[ref BlobSidecar]): bool =
# Cannot respond more than what I have asked
Expand Down Expand Up @@ -162,36 +166,24 @@ func checkResponseSubset(idList: seq[BlobIdentifier],
return false
true

func checkResponseSanity(idList: seq[DataColumnIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
# Cannot respond more than what I have asked
if columns.len > idList.len:
return false
var i = 0
while i < columns.len:
let
block_root =
hash_tree_root(columns[i][].signed_block_header.message)
idListKey = binarySearch(idList, columns[i], cmpSidecarIdentifier)

# Verify the block root
if idList[idListKey].block_root != block_root:
return false

# Verify inclusion proof
columns[i][].verify_data_column_sidecar_inclusion_proof().isOkOr:
return false
inc i
true

func checkResponseSubset(idList: seq[DataColumnIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
## Clients MUST respond with at least one sidecar, if they have it.
## Clients MAY limit the number of blocks and sidecars in the response.
## https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.3/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
for col in columns:
if binarySearch(idList, col, cmpSidecarIdentifier) == -1:
func checkColumnResponse(idList: seq[DataColumnsByRootIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
for colresp in columns:
let block_root =
hash_tree_root(colresp[].signed_block_header.message)
if block_root notin idList.mapIt(it.block_root):
Copy link
Contributor

@tersec tersec May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be in this PR, but in general, this is pointlessly allocating the mapIt-resulting seq.

It only consumes this as an iterator-equivalent and at no point does it need nor should it allocate all of idList.mapIt(it.block_root) (rather, it checks against foo1.block_root, foo2.block_root, in sequence).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it means that it's calculated a bunch of these ahead of the item it might detect a match

# received a response that does not match the
# block root of any of the items that were requested
return false
for id in idList:
if id.block_root == block_root:
if binarySearch(id.indices.asSeq, colresp, cmpColumnIndex) == -1:
# at the common block root level, the response
# is NOT a subset of the request ids
return false
# verify the inclusion proof
colresp[].verify_data_column_sidecar_inclusion_proof().isOkOr:
return false
true

proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
Expand Down Expand Up @@ -364,30 +356,24 @@ proc checkPeerCustody(rman: RequestManager,
return false

proc fetchDataColumnsFromNetwork(rman: RequestManager,
colIdList: seq[DataColumnIdentifier])
colIdList: seq[DataColumnsByRootIdentifier])
{.async: (raises: [CancelledError]).} =
var peer = await rman.network.peerPool.acquire()
try:
if rman.checkPeerCustody(peer):
debug "Requesting data columns by root", peer = peer, columns = shortLog(colIdList),
peer_score = peer.getScore()
let columns = await dataColumnSidecarsByRoot(peer, DataColumnIdentifierList colIdList)
let columns = await dataColumnSidecarsByRoot(peer, DataColumnsByRootIdentifierList colIdList)

if columns.isOk:
var ucolumns = columns.get().asSeq()
ucolumns.sort(cmpSidecarIndexes)
if not checkResponseSubset(colIdList, ucolumns):
if not checkColumnResponse(colIdList, ucolumns):
debug "Response to columns by root is not a subset",
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
peer.updateScore(PeerScoreBadResponse)
return

if not checkResponseSanity(colIdList, ucolumns):
debug "Response to columns by root have erroneous block root",
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
peer.updateScore(PeerScoreBadResponse)
return

for col in ucolumns:
rman.dataColumnQuarantine[].put(col)
var curRoot: Eth2Digest
Expand Down Expand Up @@ -575,7 +561,7 @@ proc requestManagerBlobLoop(
blobs_count = len(blobIds),
sync_speed = speed(start, finish)

proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier] =
proc getMissingDataColumns(rman: RequestManager): seq[DataColumnsByRootIdentifier] =
let
wallTime = rman.getBeaconTime()
wallSlot = wallTime.slotOrZero()
Expand All @@ -584,7 +570,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
const waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS)

var
fetches: HashSet[DataColumnIdentifier]
fetches: seq[DataColumnsByRootIdentifier]
ready: seq[Eth2Digest]

for columnless in rman.quarantine[].peekColumnless():
Expand All @@ -601,11 +587,17 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
warn "quarantine is missing data columns, but missing indices are empty",
blk = columnless.root,
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
for idx in missing.indices:
let id = DataColumnIdentifier(block_root: columnless.root, index: idx)
if id.index in rman.custody_columns_set and id notin fetches and
len(forkyBlck.message.body.blob_kzg_commitments) != 0:
fetches.incl(id)

let id = DataColumnsByRootIdentifier(
block_root: columnless.root,
indices: ColumnIndices.init(missing.indices))
for index in id.indices.asSeq:
if not(index in rman.custody_columns_set and id notin fetches and
len(forkyBlck.message.body.blob_kzg_commitments) != 0):
# do not include to fetches
discard
else:
fetches.add(id)
else:
# this is a programming error and it not should occur
warn "missing column handler found columnless block with all data columns",
Expand All @@ -631,10 +623,9 @@ proc requestManagerDataColumnLoop(
if missingColumnIds.len == 0:
continue

var columnIds: seq[DataColumnIdentifier]
var columnIds: seq[DataColumnsByRootIdentifier]
if rman.dataColumnLoader == nil:
for item in missingColumnIds:
columnIds.add item
columnIds = missingColumnIds
else:
var
blockRoots: seq[Eth2Digest]
Expand All @@ -643,14 +634,18 @@ proc requestManagerDataColumnLoop(
if columnId.block_root != curRoot:
curRoot = columnId.block_root
blockRoots.add curRoot
let data_column_sidecar = rman.dataColumnLoader(columnId).valueOr:
columnIds.add columnId
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
# A data column is missing, remove from list of fully available data columns
discard blockRoots.pop()
continue
debug "Loaded orphaned data columns from storage", columnId
rman.dataColumnQuarantine[].put(data_column_sidecar)
for index in columnId.indices:
let loaderElem = DataColumnIdentifier(
block_root: columnId.block_root,
index: index)
let data_column_sidecar = rman.dataColumnLoader(loaderElem).valueOr:
columnIds.add columnId
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
# A data column is missing, remove from list of fully available data columns
discard blockRoots.pop()
continue
debug "Loaded orphaned data columns from storage", columnId
rman.dataColumnQuarantine[].put(data_column_sidecar)
var verifiers = newSeqOfCap[
Future[Result[void, VerifierError]]
.Raising([CancelledError])](blockRoots.len)
Expand Down
39 changes: 21 additions & 18 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type
BlobIdentifier, Limit MAX_SUPPORTED_REQUEST_BLOB_SIDECARS]
DataColumnIdentifierList* = List[
DataColumnIdentifier, Limit (MAX_REQUEST_DATA_COLUMN_SIDECARS)]
DataColumnsByRootIdentifierList* = List[
DataColumnsByRootIdentifier, Limit (MAX_REQUEST_BLOCKS_DENEB)]

proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
Expand Down Expand Up @@ -393,7 +395,7 @@ p2pProtocol BeaconSync(version = 1,
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
proc dataColumnSidecarsByRoot(
peer: Peer,
colIds: DataColumnIdentifierList,
colIds: DataColumnsByRootIdentifierList,
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
{.async, libp2pProtocol("data_column_sidecars_by_root", 1).} =
Expand All @@ -402,7 +404,7 @@ p2pProtocol BeaconSync(version = 1,
if colIds.len == 0:
raise newException(InvalidInputsError, "No data columns request for root")

if colIds.lenu64 > MAX_REQUEST_DATA_COLUMN_SIDECARS:
if colIds.lenu64 > MAX_REQUEST_BLOCKS_DENEB:
raise newException(InvalidInputsError, "Exceeding data column request limit")

let
Expand All @@ -417,25 +419,26 @@ p2pProtocol BeaconSync(version = 1,
let blockRef =
dag.getBlockRef(colIds[i].block_root).valueOr:
continue
let index =
colIds[i].index
if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, index, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read data column size, database corrupt?",
bytes = bytes.len, blck = shortLog(blockRef), columnIndex = index
continue
let indices =
colIds[i].indices
for id in indices:
if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, id, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read data column size, database corrupt?",
bytes = bytes.len, blck = shortLog(blockRef), columnIndex = id
continue

peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")

await response.writeBytesSZ(
uncompressedLen, bytes,
peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found
await response.writeBytesSZ(
uncompressedLen, bytes,
peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found

# additional logging for devnets
debug "responsded to data column sidecar by root request",
peer, blck = shortLog(blockRef), columnIndex = index
# additional logging for devnets
debug "responsded to data column sidecar by root request",
peer, blck = shortLog(blockRef), columnIndex = id

debug "Data column root request done",
peer, roots = colIds.len, count, found
Expand Down
Loading