Skip to content

Commit 1efb15c

Browse files
committed
Data columns verifications: Batch
1 parent fe06732 commit 1efb15c

18 files changed

+1297
-770
lines changed

beacon-chain/core/peerdas/helpers.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -405,34 +405,53 @@ func DataColumnSidecarsForReconstruct(
405405
return sidecars, nil
406406
}
407407

408-
// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
409-
// data column.
410-
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
408+
// VerifyDataColumnsSidecarKZGProofs verifies the provided KZG Proofs of data columns.
409+
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) (bool, error) {
410+
// Retrieve the number of columns.
411411
numberOfColumns := params.BeaconConfig().NumberOfColumns
412412

413-
if sc.ColumnIndex >= numberOfColumns {
414-
return false, errIndexTooLarge
413+
// Compute the total count.
414+
count := 0
415+
for _, sidecar := range sidecars {
416+
count += len(sidecar.DataColumn)
415417
}
416418

417-
if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
418-
return false, errMismatchLength
419-
}
420-
421-
count := len(sc.DataColumn)
422-
423419
commitments := make([]kzg.Bytes48, 0, count)
424420
indices := make([]uint64, 0, count)
425421
cells := make([]kzg.Cell, 0, count)
426422
proofs := make([]kzg.Bytes48, 0, count)
427423

428-
for i := range sc.DataColumn {
429-
commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i]))
430-
indices = append(indices, sc.ColumnIndex)
431-
cells = append(cells, kzg.Cell(sc.DataColumn[i]))
432-
proofs = append(proofs, kzg.Bytes48(sc.KzgProof[i]))
424+
for _, sidecar := range sidecars {
425+
// Check if the columns index is not too large
426+
if sidecar.ColumnIndex >= numberOfColumns {
427+
return false, errIndexTooLarge
428+
}
429+
430+
// Check if the KZG commitments size and data column size match.
431+
if len(sidecar.DataColumn) != len(sidecar.KzgCommitments) {
432+
return false, errMismatchLength
433+
}
434+
435+
// Check if the KZG proofs size and data column size match.
436+
if len(sidecar.DataColumn) != len(sidecar.KzgProof) {
437+
return false, errMismatchLength
438+
}
439+
440+
for i := range sidecar.DataColumn {
441+
commitments = append(commitments, kzg.Bytes48(sidecar.KzgCommitments[i]))
442+
indices = append(indices, sidecar.ColumnIndex)
443+
cells = append(cells, kzg.Cell(sidecar.DataColumn[i]))
444+
proofs = append(proofs, kzg.Bytes48(sidecar.KzgProof[i]))
445+
}
446+
}
447+
448+
// Verify all the batch at once.
449+
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
450+
if err != nil {
451+
return false, errors.Wrap(err, "verify cell KZG proof batch")
433452
}
434453

435-
return kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
454+
return verified, nil
436455
}
437456

438457
// CustodySubnetCount returns the number of subnets the node should participate in for custody.

beacon-chain/core/peerdas/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
9393
for i, sidecar := range sCars {
9494
roCol, err := blocks.NewRODataColumn(sidecar)
9595
require.NoError(t, err)
96-
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roCol)
96+
verified, err := peerdas.VerifyDataColumnsSidecarKZGProofs([]blocks.RODataColumn{roCol})
9797
require.NoError(t, err)
9898
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
9999
}

beacon-chain/sync/data_columns_sampling.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type dataColumnSampler1D struct {
6060
// peerFromColumn maps a column to the peer responsible for custody.
6161
peerFromColumn map[uint64]map[peer.ID]bool
6262
// columnVerifier verifies a column according to the specified requirements.
63-
columnVerifier verification.NewColumnVerifier
63+
columnVerifier verification.NewDataColumnsVerifier
6464
}
6565

6666
// newDataColumnSampler1D creates a new 1D data column sampler.
@@ -69,7 +69,7 @@ func newDataColumnSampler1D(
6969
clock *startup.Clock,
7070
ctxMap ContextByteVersions,
7171
stateNotifier statefeed.Notifier,
72-
colVerifier verification.NewColumnVerifier,
72+
colVerifier verification.NewDataColumnsVerifier,
7373
) *dataColumnSampler1D {
7474
numColumns := params.BeaconConfig().NumberOfColumns
7575
peerFromColumn := make(map[uint64]map[peer.ID]bool, numColumns)
@@ -509,7 +509,7 @@ func verifyColumn(
509509
root [32]byte,
510510
pid peer.ID,
511511
requestedColumns map[uint64]bool,
512-
columnVerifier verification.NewColumnVerifier,
512+
dataColumnsVerifier verification.NewDataColumnsVerifier,
513513
) bool {
514514
retrievedColumn := roDataColumn.ColumnIndex
515515

@@ -538,9 +538,11 @@ func verifyColumn(
538538
return false
539539
}
540540

541-
vf := columnVerifier(roDataColumn, verification.SamplingColumnSidecarRequirements)
541+
// TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch.
542+
verifier := dataColumnsVerifier([]blocks.RODataColumn{roDataColumn}, verification.SamplingColumnSidecarRequirements)
543+
542544
// Filter out columns which did not pass the KZG inclusion proof verification.
543-
if err := vf.SidecarInclusionProven(); err != nil {
545+
if err := verifier.SidecarInclusionProven(); err != nil {
544546
log.WithFields(logrus.Fields{
545547
"peerID": pid,
546548
"root": fmt.Sprintf("%#x", root),
@@ -550,7 +552,7 @@ func verifyColumn(
550552
}
551553

552554
// Filter out columns which did not pass the KZG proof verification.
553-
if err := vf.SidecarKzgProofVerified(); err != nil {
555+
if err := verifier.SidecarKzgProofVerified(); err != nil {
554556
log.WithFields(logrus.Fields{
555557
"peerID": pid,
556558
"root": fmt.Sprintf("%#x", root),

beacon-chain/sync/data_columns_sampling_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
202202
iniWaiter := verification.NewInitializerWaiter(clockSync, nil, nil)
203203
ini, err := iniWaiter.WaitForInitializer(context.Background())
204204
require.NoError(t, err)
205-
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newColumnVerifierFromInitializer(ini))
205+
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newDataColumnsVerifierFromInitializer(ini))
206206

207207
return test, sampler
208208
}

beacon-chain/sync/initial-sync/blocks_fetcher.go

Lines changed: 66 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type blocksFetcherConfig struct {
8282
mode syncMode
8383
bs filesystem.BlobStorageSummarizer
8484
bv verification.NewBlobVerifier
85-
cv verification.NewColumnVerifier
85+
cv verification.NewDataColumnsVerifier
8686
}
8787

8888
// blocksFetcher is a service to fetch chain data from peers.
@@ -100,7 +100,7 @@ type blocksFetcher struct {
100100
db db.ReadOnlyDatabase
101101
bs filesystem.BlobStorageSummarizer
102102
bv verification.NewBlobVerifier
103-
cv verification.NewColumnVerifier
103+
cv verification.NewDataColumnsVerifier
104104
blocksPerPeriod uint64
105105
rateLimiter *leakybucket.Collector
106106
peerLocks map[peer.ID]*peerLock
@@ -1155,67 +1155,91 @@ func (f *blocksFetcher) waitForPeersForDataColumns(
11551155
return dataColumnsByAdmissiblePeer, nil
11561156
}
11571157

1158-
// processDataColumn mutates `bwbs` argument by adding the data column,
1158+
// processDataColumns mutates `bwbs` argument by adding the data column,
11591159
// and mutates `missingColumnsByRoot` by removing the data column if the
11601160
// data column passes all the check.
1161-
func processDataColumn(
1161+
func (f *blocksFetcher) processDataColumns(
11621162
wrappedBwbsMissingColumns *bwbsMissingColumns,
1163-
columnVerifier verification.NewColumnVerifier,
1164-
blocksByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
1163+
blockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
11651164
indicesByRoot map[[fieldparams.RootLength]byte][]int,
1166-
dataColumn blocks.RODataColumn,
1165+
dataColumns []blocks.RODataColumn,
11671166
) bool {
1168-
// Extract the block root from the data column.
1169-
blockRoot := dataColumn.BlockRoot()
1170-
1171-
// Find the position of the block in `bwbs` that corresponds to this block root.
1172-
indices, ok := indicesByRoot[blockRoot]
1173-
if !ok {
1174-
// The peer returned a data column that we did not expect.
1175-
// This is among others possible when the peer is not on the same fork.
1176-
return false
1177-
}
1167+
// Fiter out data columns:
1168+
// - that are not expected and,
1169+
// - which correspond to blocks before Deneb.
1170+
1171+
// Not expected data columns are among others possible when
1172+
// the peer is not on the same fork, due to the nature of
1173+
// data columns by range requests.
1174+
wrappedBlockDataColumns := make([]verify.WrappedBlockDataColumn, 0, len(dataColumns))
1175+
for _, dataColumn := range dataColumns {
1176+
// Extract the block root from the data column.
1177+
blockRoot := dataColumn.BlockRoot()
1178+
1179+
// Skip if the block root is not expected.
1180+
// This is possible when the peer is not on the same fork.
1181+
_, ok := indicesByRoot[blockRoot]
1182+
if !ok {
1183+
continue
1184+
}
11781185

1179-
// Extract the block from the block root.
1180-
block, ok := blocksByRoot[blockRoot]
1181-
if !ok {
1182-
// This should never happen.
1183-
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - block not found")
1184-
return false
1185-
}
1186+
// Retrieve the block from the block root.
1187+
block, ok := blockByRoot[blockRoot]
1188+
if !ok {
1189+
// This should never happen.
1190+
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - block not found for root")
1191+
return false
1192+
}
1193+
1194+
// Skip if the block is before Deneb.
1195+
if block.Version() < version.Deneb {
1196+
continue
1197+
}
11861198

1187-
// Verify the data column.
1188-
if err := verify.ColumnAlignsWithBlock(dataColumn, block, columnVerifier); err != nil {
1189-
log.WithError(err).WithFields(logrus.Fields{
1190-
"root": fmt.Sprintf("%#x", blockRoot),
1191-
"slot": block.Block().Slot(),
1192-
"column": dataColumn.ColumnIndex,
1193-
}).Warning("Fetch data columns from peers - fetched data column does not align with block")
1199+
wrappedBlockDataColumn := verify.WrappedBlockDataColumn{
1200+
ROBlock: block,
1201+
DataColumn: dataColumn,
1202+
}
1203+
1204+
wrappedBlockDataColumns = append(wrappedBlockDataColumns, wrappedBlockDataColumn)
1205+
}
11941206

1207+
// Verify the data columns.
1208+
if err := verify.DataColumnsAlignWithBlock(wrappedBlockDataColumns, f.cv); err != nil {
11951209
// TODO: Should we downscore the peer for that?
11961210
return false
11971211
}
11981212

1199-
// Populate the corresponding items in `bwbs`.
1200-
func() {
1201-
mu := &wrappedBwbsMissingColumns.mu
1213+
wrappedBwbsMissingColumns.mu.Lock()
1214+
defer wrappedBwbsMissingColumns.mu.Unlock()
1215+
1216+
bwbs := wrappedBwbsMissingColumns.bwbs
1217+
missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot
12021218

1203-
mu.Lock()
1204-
defer mu.Unlock()
1219+
for _, wrappedBlockDataColumn := range wrappedBlockDataColumns {
1220+
dataColumn := wrappedBlockDataColumn.DataColumn
12051221

1206-
bwbs := wrappedBwbsMissingColumns.bwbs
1207-
missingColumnsByRoot := wrappedBwbsMissingColumns.missingColumnsByRoot
1222+
// Extract the block root from the data column.
1223+
blockRoot := dataColumn.BlockRoot()
1224+
1225+
// Extract the indices in bwb corresponding to the block root.
1226+
indices, ok := indicesByRoot[blockRoot]
1227+
if !ok {
1228+
// This should never happen.
1229+
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Error("Fetch data columns from peers - indices not found for root")
1230+
return false
1231+
}
12081232

1233+
// Populate the corresponding items in `bwbs`.
12091234
for _, index := range indices {
12101235
bwbs[index].Columns = append(bwbs[index].Columns, dataColumn)
12111236
}
1212-
12131237
// Remove the column from the missing columns.
12141238
delete(missingColumnsByRoot[blockRoot], dataColumn.ColumnIndex)
12151239
if len(missingColumnsByRoot[blockRoot]) == 0 {
12161240
delete(missingColumnsByRoot, blockRoot)
12171241
}
1218-
}()
1242+
}
12191243

12201244
return true
12211245
}
@@ -1299,17 +1323,8 @@ func (f *blocksFetcher) fetchDataColumnFromPeer(
12991323
return
13001324
}
13011325

1302-
globalSuccess := false
1303-
1304-
for _, dataColumn := range roDataColumns {
1305-
success := processDataColumn(wrappedBwbsMissingColumns, f.cv, blocksByRoot, indicesByRoot, dataColumn)
1306-
if success {
1307-
globalSuccess = true
1308-
}
1309-
}
1310-
1311-
if !globalSuccess {
1312-
log.Debug("Fetch data columns from peers - no valid data column returned")
1326+
if !f.processDataColumns(wrappedBwbsMissingColumns, blocksByRoot, indicesByRoot, roDataColumns) {
1327+
log.Warning("Fetch data columns from peers - at least one data column is invalid")
13131328
return
13141329
}
13151330

beacon-chain/sync/initial-sync/blocks_fetcher_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ func TestBlocksFetcher_scheduleRequest(t *testing.T) {
414414
fetcher.scheduleRequest(context.Background(), 1, blockBatchLimit))
415415
})
416416
}
417+
417418
func TestBlocksFetcher_handleRequest(t *testing.T) {
418419
blockBatchLimit := flags.Get().BlockBatchLimit
419420
chainConfig := struct {
@@ -1988,14 +1989,9 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
19881989
{slot: 38, columnIndex: 6, alterate: true},
19891990
{slot: 38, columnIndex: 70},
19901991
},
1991-
},
1992-
(&ethpb.DataColumnSidecarsByRangeRequest{
1993-
StartSlot: 38,
1994-
Count: 1,
1995-
Columns: []uint64{6},
1996-
}).String(): {
19971992
{
19981993
{slot: 38, columnIndex: 6},
1994+
{slot: 38, columnIndex: 70},
19991995
},
20001996
},
20011997
},
@@ -2243,7 +2239,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
22432239
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
22442240
p2p: p2pSvc,
22452241
bs: blobStorageSummarizer,
2246-
cv: newColumnVerifierFromInitializer(ini),
2242+
cv: newDataColumnsVerifierFromInitializer(ini),
22472243
})
22482244

22492245
// Fetch the data columns from the peers.

beacon-chain/sync/initial-sync/blocks_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ type blocksQueueConfig struct {
7373
mode syncMode
7474
bs filesystem.BlobStorageSummarizer
7575
bv verification.NewBlobVerifier
76-
cv verification.NewColumnVerifier
76+
cv verification.NewDataColumnsVerifier
7777
}
7878

7979
// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)

beacon-chain/sync/initial-sync/round_robin.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.S
8989
mode: mode,
9090
bs: summarizer,
9191
bv: s.newBlobVerifier,
92-
cv: s.newColumnVerifier,
92+
cv: s.newDataColumnsVerifier,
9393
}
9494
queue := newBlocksQueue(ctx, cfg)
9595
if err := queue.start(); err != nil {
@@ -176,7 +176,7 @@ func (s *Service) processFetchedDataRegSync(
176176
return
177177
}
178178
if coreTime.PeerDASIsActive(startSlot) {
179-
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
179+
bv := verification.NewDataColumnBatchVerifier(s.newDataColumnsVerifier, verification.InitsyncColumnSidecarRequirements)
180180
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
181181
batchFields := logrus.Fields{
182182
"firstSlot": data.bwb[0].Block.Block().Slot(),
@@ -367,7 +367,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
367367
}
368368
var aStore das.AvailabilityStore
369369
if coreTime.PeerDASIsActive(first.Block().Slot()) {
370-
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
370+
bv := verification.NewDataColumnBatchVerifier(s.newDataColumnsVerifier, verification.InitsyncColumnSidecarRequirements)
371371
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
372372
s.logBatchSyncStatus(genesis, first, len(bwb))
373373
for _, bb := range bwb {

0 commit comments

Comments
 (0)