Skip to content

Commit 94efe33

Browse files
authored
replace:DataColumnIdentifier -> DataColumnsByRootIdentifier while root requesting (#7117)
* save work * refactor more, save work * add spec * address reviews * updated link * that shouldn't be columnIds
1 parent de75f6d commit 94efe33

File tree

3 files changed

+88
-75
lines changed

3 files changed

+88
-75
lines changed

beacon_chain/spec/datatypes/fulu.nim

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ type
9595

9696
type
9797
DataColumn* = List[KzgCell, Limit(MAX_BLOB_COMMITMENTS_PER_BLOCK)]
98+
DataColumnIndices* = List[ColumnIndex, Limit(NUMBER_OF_COLUMNS)]
9899

99100
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.3/specs/fulu/das-core.md#datacolumnsidecar
100101
DataColumnSidecar* = object
@@ -113,6 +114,11 @@ type
113114
block_root*: Eth2Digest
114115
index*: ColumnIndex
115116

117+
# https://github.com/ethereum/consensus-specs/blob/b8b5fbb8d16f52d42a716fa93289062fe2124c7c/specs/fulu/p2p-interface.md#datacolumnsbyrootidentifier
118+
DataColumnsByRootIdentifier* = object
119+
block_root*: Eth2Digest
120+
indices*: DataColumnIndices
121+
116122
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#matrixentry
117123
MatrixEntry* = object
118124
cell*: Cell
@@ -615,6 +621,15 @@ func shortLog*(v: seq[DataColumnSidecar]): auto =
615621
func shortLog*(x: seq[DataColumnIdentifier]): string =
616622
"[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]"
617623

624+
func shortLog*(xs: seq[DataColumnsByRootIdentifier]): string =
625+
## Formats like: [abcd…/0,2,4, ef09…/1,3]
626+
"[" &
627+
xs.mapIt(
628+
shortLog(it.block_root) & "/" &
629+
it.indices.mapIt($it).join(",")
630+
).join(", ") &
631+
"]"
632+
618633
func shortLog*(x: seq[ColumnIndex]): string =
619634
"<" & x.mapIt($it).join(", ") & ">"
620635

beacon_chain/sync/request_manager.nim

Lines changed: 51 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
{.push raises: [].}
99

1010
import chronos, chronicles
11+
import ssz_serialization/types
1112
import
1213
../spec/[forks, network, peerdas_helpers],
1314
../networking/eth2_network,
@@ -130,6 +131,9 @@ func cmpSidecarIdentifier(x: BlobIdentifier | DataColumnIdentifier,
130131
y: ref BlobSidecar | ref DataColumnSidecar): int =
131132
cmp(x.index, y[].index)
132133

134+
func cmpColumnIndex(x: ColumnIndex, y: ref DataColumnSidecar): int =
135+
cmp(x, y[].index)
136+
133137
func checkResponseSanity(idList: seq[BlobIdentifier],
134138
blobs: openArray[ref BlobSidecar]): bool =
135139
# Cannot respond more than what I have asked
@@ -162,36 +166,24 @@ func checkResponseSubset(idList: seq[BlobIdentifier],
162166
return false
163167
true
164168

165-
func checkResponseSanity(idList: seq[DataColumnIdentifier],
166-
columns: openArray[ref DataColumnSidecar]): bool =
167-
# Cannot respond more than what I have asked
168-
if columns.len > idList.len:
169-
return false
170-
var i = 0
171-
while i < columns.len:
172-
let
173-
block_root =
174-
hash_tree_root(columns[i][].signed_block_header.message)
175-
idListKey = binarySearch(idList, columns[i], cmpSidecarIdentifier)
176-
177-
# Verify the block root
178-
if idList[idListKey].block_root != block_root:
179-
return false
180-
181-
# Verify inclusion proof
182-
columns[i][].verify_data_column_sidecar_inclusion_proof().isOkOr:
183-
return false
184-
inc i
185-
true
186-
187-
func checkResponseSubset(idList: seq[DataColumnIdentifier],
188-
columns: openArray[ref DataColumnSidecar]): bool =
189-
## Clients MUST respond with at least one sidecar, if they have it.
190-
## Clients MAY limit the number of blocks and sidecars in the response.
191-
## https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.3/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
192-
for col in columns:
193-
if binarySearch(idList, col, cmpSidecarIdentifier) == -1:
169+
func checkColumnResponse(idList: seq[DataColumnsByRootIdentifier],
170+
columns: openArray[ref DataColumnSidecar]): bool =
171+
for colresp in columns:
172+
let block_root =
173+
hash_tree_root(colresp[].signed_block_header.message)
174+
if block_root notin idList.mapIt(it.block_root):
175+
# received a response that does not match the
176+
# block root of any of the items that were requested
194177
return false
178+
for id in idList:
179+
if id.block_root == block_root:
180+
if binarySearch(id.indices.asSeq, colresp, cmpColumnIndex) == -1:
181+
# at the common block root level, the response
182+
# is NOT a subset of the request ids
183+
return false
184+
# verify the inclusion proof
185+
colresp[].verify_data_column_sidecar_inclusion_proof().isOkOr:
186+
return false
195187
true
196188

197189
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
@@ -364,30 +356,24 @@ proc checkPeerCustody(rman: RequestManager,
364356
return false
365357

366358
proc fetchDataColumnsFromNetwork(rman: RequestManager,
367-
colIdList: seq[DataColumnIdentifier])
359+
colIdList: seq[DataColumnsByRootIdentifier])
368360
{.async: (raises: [CancelledError]).} =
369361
var peer = await rman.network.peerPool.acquire()
370362
try:
371363
if rman.checkPeerCustody(peer):
372364
debug "Requesting data columns by root", peer = peer, columns = shortLog(colIdList),
373365
peer_score = peer.getScore()
374-
let columns = await dataColumnSidecarsByRoot(peer, DataColumnIdentifierList colIdList)
366+
let columns = await dataColumnSidecarsByRoot(peer, DataColumnsByRootIdentifierList colIdList)
375367

376368
if columns.isOk:
377369
var ucolumns = columns.get().asSeq()
378370
ucolumns.sort(cmpSidecarIndexes)
379-
if not checkResponseSubset(colIdList, ucolumns):
371+
if not checkColumnResponse(colIdList, ucolumns):
380372
debug "Response to columns by root is not a subset",
381373
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
382374
peer.updateScore(PeerScoreBadResponse)
383375
return
384376

385-
if not checkResponseSanity(colIdList, ucolumns):
386-
debug "Response to columns by root have erroneous block root",
387-
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
388-
peer.updateScore(PeerScoreBadResponse)
389-
return
390-
391377
for col in ucolumns:
392378
rman.dataColumnQuarantine[].put(col)
393379
var curRoot: Eth2Digest
@@ -575,7 +561,7 @@ proc requestManagerBlobLoop(
575561
blobs_count = len(blobIds),
576562
sync_speed = speed(start, finish)
577563

578-
proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier] =
564+
proc getMissingDataColumns(rman: RequestManager): seq[DataColumnsByRootIdentifier] =
579565
let
580566
wallTime = rman.getBeaconTime()
581567
wallSlot = wallTime.slotOrZero()
@@ -584,7 +570,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
584570
const waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS)
585571

586572
var
587-
fetches: HashSet[DataColumnIdentifier]
573+
fetches: seq[DataColumnsByRootIdentifier]
588574
ready: seq[Eth2Digest]
589575

590576
for columnless in rman.quarantine[].peekColumnless():
@@ -601,11 +587,17 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
601587
warn "quarantine is missing data columns, but missing indices are empty",
602588
blk = columnless.root,
603589
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
604-
for idx in missing.indices:
605-
let id = DataColumnIdentifier(block_root: columnless.root, index: idx)
606-
if id.index in rman.custody_columns_set and id notin fetches and
607-
len(forkyBlck.message.body.blob_kzg_commitments) != 0:
608-
fetches.incl(id)
590+
591+
let id = DataColumnsByRootIdentifier(
592+
block_root: columnless.root,
593+
indices: DataColumnIndices.init(missing.indices))
594+
for index in id.indices.asSeq:
595+
if not(index in rman.custody_columns_set and id notin fetches and
596+
len(forkyBlck.message.body.blob_kzg_commitments) != 0):
597+
# do not include to fetches
598+
discard
599+
else:
600+
fetches.add(id)
609601
else:
610602
# this is a programming error and it not should occur
611603
warn "missing column handler found columnless block with all data columns",
@@ -631,10 +623,9 @@ proc requestManagerDataColumnLoop(
631623
if missingColumnIds.len == 0:
632624
continue
633625

634-
var columnIds: seq[DataColumnIdentifier]
626+
var columnIds: seq[DataColumnsByRootIdentifier]
635627
if rman.dataColumnLoader == nil:
636-
for item in missingColumnIds:
637-
columnIds.add item
628+
columnIds = missingColumnIds
638629
else:
639630
var
640631
blockRoots: seq[Eth2Digest]
@@ -643,14 +634,18 @@ proc requestManagerDataColumnLoop(
643634
if columnId.block_root != curRoot:
644635
curRoot = columnId.block_root
645636
blockRoots.add curRoot
646-
let data_column_sidecar = rman.dataColumnLoader(columnId).valueOr:
647-
columnIds.add columnId
648-
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
649-
# A data column is missing, remove from list of fully available data columns
650-
discard blockRoots.pop()
651-
continue
652-
debug "Loaded orphaned data columns from storage", columnId
653-
rman.dataColumnQuarantine[].put(data_column_sidecar)
637+
for index in columnId.indices:
638+
let loaderElem = DataColumnIdentifier(
639+
block_root: columnId.block_root,
640+
index: index)
641+
let data_column_sidecar = rman.dataColumnLoader(loaderElem).valueOr:
642+
columnIds.add columnId
643+
if blockRoots.len > 0 and blockRoots[^1] == curRoot:
644+
# A data column is missing, remove from list of fully available data columns
645+
discard blockRoots.pop()
646+
continue
647+
debug "Loaded orphaned data columns from storage", columnId
648+
rman.dataColumnQuarantine[].put(data_column_sidecar)
654649
var verifiers = newSeqOfCap[
655650
Future[Result[void, VerifierError]]
656651
.Raising([CancelledError])](blockRoots.len)

beacon_chain/sync/sync_protocol.nim

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type
4141
BlobIdentifier, Limit MAX_SUPPORTED_REQUEST_BLOB_SIDECARS]
4242
DataColumnIdentifierList* = List[
4343
DataColumnIdentifier, Limit (MAX_REQUEST_DATA_COLUMN_SIDECARS)]
44+
DataColumnsByRootIdentifierList* = List[
45+
DataColumnsByRootIdentifier, Limit (MAX_REQUEST_BLOCKS_DENEB)]
4446

4547
proc readChunkPayload*(
4648
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
@@ -390,10 +392,10 @@ p2pProtocol BeaconSync(version = 1,
390392
peer.networkState.dag.cfg.MAX_BLOBS_PER_BLOCK_ELECTRA,
391393
peer.networkState.dag.cfg.MAX_REQUEST_BLOB_SIDECARS_ELECTRA)
392394

393-
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
395+
# https://github.com/ethereum/consensus-specs/blob/b8b5fbb8d16f52d42a716fa93289062fe2124c7c/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
394396
proc dataColumnSidecarsByRoot(
395397
peer: Peer,
396-
colIds: DataColumnIdentifierList,
398+
colIds: DataColumnsByRootIdentifierList,
397399
response: MultipleChunksResponse[
398400
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
399401
{.async, libp2pProtocol("data_column_sidecars_by_root", 1).} =
@@ -402,7 +404,7 @@ p2pProtocol BeaconSync(version = 1,
402404
if colIds.len == 0:
403405
raise newException(InvalidInputsError, "No data columns request for root")
404406

405-
if colIds.lenu64 > MAX_REQUEST_DATA_COLUMN_SIDECARS:
407+
if colIds.lenu64 > MAX_REQUEST_BLOCKS_DENEB:
406408
raise newException(InvalidInputsError, "Exceeding data column request limit")
407409

408410
let
@@ -417,25 +419,26 @@ p2pProtocol BeaconSync(version = 1,
417419
let blockRef =
418420
dag.getBlockRef(colIds[i].block_root).valueOr:
419421
continue
420-
let index =
421-
colIds[i].index
422-
if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, index, bytes):
423-
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
424-
warn "Cannot read data column size, database corrupt?",
425-
bytes = bytes.len, blck = shortLog(blockRef), columnIndex = index
426-
continue
422+
let indices =
423+
colIds[i].indices
424+
for id in indices:
425+
if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, id, bytes):
426+
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
427+
warn "Cannot read data column size, database corrupt?",
428+
bytes = bytes.len, blck = shortLog(blockRef), columnIndex = id
429+
continue
427430

428-
peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
429-
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
431+
peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
432+
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
430433

431-
await response.writeBytesSZ(
432-
uncompressedLen, bytes,
433-
peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
434-
inc found
434+
await response.writeBytesSZ(
435+
uncompressedLen, bytes,
436+
peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
437+
inc found
435438

436-
# additional logging for devnets
437-
debug "responsded to data column sidecar by root request",
438-
peer, blck = shortLog(blockRef), columnIndex = index
439+
# additional logging for devnets
440+
debug "responsded to data column sidecar by root request",
441+
peer, blck = shortLog(blockRef), columnIndex = id
439442

440443
debug "Data column root request done",
441444
peer, roots = colIds.len, count, found

0 commit comments

Comments
 (0)