@@ -10,6 +10,7 @@ import (
1010
1111 "github.com/libp2p/go-libp2p/core/peer"
1212 "github.com/pkg/errors"
13+ "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
1314 "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
1415 "github.com/sirupsen/logrus"
1516
@@ -60,7 +61,7 @@ type dataColumnSampler1D struct {
6061 // peerFromColumn maps a column to the peer responsible for custody.
6162 peerFromColumn map [uint64 ]map [peer.ID ]bool
6263 // columnVerifier verifies a column according to the specified requirements.
63- columnVerifier verification.NewColumnVerifier
64+ columnVerifier verification.NewDataColumnsVerifier
6465}
6566
6667// newDataColumnSampler1D creates a new 1D data column sampler.
@@ -69,7 +70,7 @@ func newDataColumnSampler1D(
6970 clock * startup.Clock ,
7071 ctxMap ContextByteVersions ,
7172 stateNotifier statefeed.Notifier ,
72- colVerifier verification.NewColumnVerifier ,
73+ colVerifier verification.NewDataColumnsVerifier ,
7374) * dataColumnSampler1D {
7475 numColumns := params .BeaconConfig ().NumberOfColumns
7576 peerFromColumn := make (map [uint64 ]map [peer.ID ]bool , numColumns )
@@ -265,7 +266,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
265266 samplesCount := min (params .BeaconConfig ().SamplesPerSlot , uint64 (len (d .nonCustodyColumns ))- params .BeaconConfig ().NumberOfColumns / 2 )
266267
267268 // TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
268- _ , _ , err = d .incrementalDAS (ctx , data . BlockRoot , randomizedColumns , samplesCount )
269+ _ , _ , err = d .incrementalDAS (ctx , data , randomizedColumns , samplesCount )
269270 if err != nil {
270271 log .WithError (err ).Error ("Failed to run incremental DAS" )
271272 }
@@ -276,21 +277,22 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
276277// According to https://github.com/ethereum/consensus-specs/issues/3825, we're going to select query samples exclusively from the non custody columns.
277278func (d * dataColumnSampler1D ) incrementalDAS (
278279 ctx context.Context ,
279- root [ fieldparams . RootLength ] byte ,
280+ blockProcessedData * statefeed. BlockProcessedData ,
280281 columns []uint64 ,
281282 sampleCount uint64 ,
282283) (bool , []roundSummary , error ) {
283284 allowedFailures := uint64 (0 )
284285 firstColumnToSample , extendedSampleCount := uint64 (0 ), peerdas .ExtendedSampleCount (sampleCount , allowedFailures )
285286 roundSummaries := make ([]roundSummary , 0 , 1 ) // We optimistically allocate only one round summary.
287+ blockRoot := blockProcessedData .BlockRoot
286288
287289 start := time .Now ()
288290
289291 for round := 1 ; ; /*No exit condition */ round ++ {
290292 if extendedSampleCount > uint64 (len (columns )) {
291293 // We already tried to sample all possible columns, this is the unhappy path.
292294 log .WithFields (logrus.Fields {
293- "root" : fmt .Sprintf ("%#x" , root ),
295+ "root" : fmt .Sprintf ("%#x" , blockRoot ),
294296 "round" : round - 1 ,
295297 }).Warning ("Some columns are still missing after trying to sample all possible columns" )
296298 return false , roundSummaries , nil
@@ -301,13 +303,13 @@ func (d *dataColumnSampler1D) incrementalDAS(
301303 columnsToSampleCount := extendedSampleCount - firstColumnToSample
302304
303305 log .WithFields (logrus.Fields {
304- "root" : fmt .Sprintf ("%#x" , root ),
306+ "root" : fmt .Sprintf ("%#x" , blockRoot ),
305307 "columns" : columnsToSample ,
306308 "round" : round ,
307309 }).Debug ("Start data columns sampling" )
308310
309311 // Sample data columns from peers in parallel.
310- retrievedSamples := d .sampleDataColumns (ctx , root , columnsToSample )
312+ retrievedSamples := d .sampleDataColumns (ctx , blockProcessedData , columnsToSample )
311313
312314 missingSamples := make (map [uint64 ]bool )
313315 for _ , column := range columnsToSample {
@@ -325,7 +327,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
325327 if retrievedSampleCount == columnsToSampleCount {
326328 // All columns were correctly sampled, this is the happy path.
327329 log .WithFields (logrus.Fields {
328- "root" : fmt .Sprintf ("%#x" , root ),
330+ "root" : fmt .Sprintf ("%#x" , blockRoot ),
329331 "neededRounds" : round ,
330332 "duration" : time .Since (start ),
331333 }).Debug ("All columns were successfully sampled" )
@@ -344,7 +346,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
344346 extendedSampleCount = peerdas .ExtendedSampleCount (sampleCount , allowedFailures )
345347
346348 log .WithFields (logrus.Fields {
347- "root" : fmt .Sprintf ("%#x" , root ),
349+ "root" : fmt .Sprintf ("%#x" , blockRoot ),
348350 "round" : round ,
349351 "missingColumnsCount" : allowedFailures ,
350352 "currentSampleIndex" : oldExtendedSampleCount ,
@@ -355,7 +357,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
355357
356358func (d * dataColumnSampler1D ) sampleDataColumns (
357359 ctx context.Context ,
358- root [ fieldparams . RootLength ] byte ,
360+ blockProcessedData * statefeed. BlockProcessedData ,
359361 columns []uint64 ,
360362) map [uint64 ]bool {
361363 // distribute samples to peer
@@ -365,10 +367,12 @@ func (d *dataColumnSampler1D) sampleDataColumns(
365367 mu sync.Mutex
366368 wg sync.WaitGroup
367369 )
370+
368371 res := make (map [uint64 ]bool )
372+
369373 sampleFromPeer := func (pid peer.ID , cols map [uint64 ]bool ) {
370374 defer wg .Done ()
371- retrieved := d .sampleDataColumnsFromPeer (ctx , pid , root , cols )
375+ retrieved := d .sampleDataColumnsFromPeer (ctx , pid , blockProcessedData , cols )
372376
373377 mu .Lock ()
374378 for col := range retrieved {
@@ -414,15 +418,15 @@ func (d *dataColumnSampler1D) distributeSamplesToPeer(
414418func (d * dataColumnSampler1D ) sampleDataColumnsFromPeer (
415419 ctx context.Context ,
416420 pid peer.ID ,
417- root [ fieldparams . RootLength ] byte ,
421+ blockProcessedData * statefeed. BlockProcessedData ,
418422 requestedColumns map [uint64 ]bool ,
419423) map [uint64 ]bool {
420424 retrievedColumns := make (map [uint64 ]bool )
421425
422426 req := make (types.DataColumnSidecarsByRootReq , 0 )
423427 for col := range requestedColumns {
424428 req = append (req , & eth.DataColumnIdentifier {
425- BlockRoot : root [:],
429+ BlockRoot : blockProcessedData . BlockRoot [:],
426430 ColumnIndex : col ,
427431 })
428432 }
@@ -434,22 +438,23 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
434438 return nil
435439 }
436440
441+ // TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch instead of looping over columns.
437442 for _ , roDataColumn := range roDataColumns {
438- if verifyColumn (roDataColumn , root , pid , requestedColumns , d .columnVerifier ) {
443+ if verifyColumn (roDataColumn , blockProcessedData , pid , requestedColumns , d .columnVerifier ) {
439444 retrievedColumns [roDataColumn .ColumnIndex ] = true
440445 }
441446 }
442447
443448 if len (retrievedColumns ) == len (requestedColumns ) {
444449 log .WithFields (logrus.Fields {
445450 "peerID" : pid ,
446- "root" : fmt .Sprintf ("%#x" , root ),
451+ "root" : fmt .Sprintf ("%#x" , blockProcessedData . BlockRoot ),
447452 "requestedColumns" : sortedSliceFromMap (requestedColumns ),
448453 }).Debug ("Sampled columns from peer successfully" )
449454 } else {
450455 log .WithFields (logrus.Fields {
451456 "peerID" : pid ,
452- "root" : fmt .Sprintf ("%#x" , root ),
457+ "root" : fmt .Sprintf ("%#x" , blockProcessedData . BlockRoot ),
453458 "requestedColumns" : sortedSliceFromMap (requestedColumns ),
454459 "retrievedColumns" : sortedSliceFromMap (retrievedColumns ),
455460 }).Debug ("Sampled columns from peer with some errors" )
@@ -506,20 +511,22 @@ func selectRandomPeer(peers map[peer.ID]bool) peer.ID {
506511// the KZG inclusion and the KZG proof.
507512func verifyColumn (
508513 roDataColumn blocks.RODataColumn ,
509- root [ 32 ] byte ,
514+ blockProcessedData * statefeed. BlockProcessedData ,
510515 pid peer.ID ,
511516 requestedColumns map [uint64 ]bool ,
512- columnVerifier verification.NewColumnVerifier ,
517+ dataColumnsVerifier verification.NewDataColumnsVerifier ,
513518) bool {
514519 retrievedColumn := roDataColumn .ColumnIndex
515520
516521 // Filter out columns with incorrect root.
517- actualRoot := roDataColumn .BlockRoot ()
518- if actualRoot != root {
522+ columnRoot := roDataColumn .BlockRoot ()
523+ blockRoot := blockProcessedData .BlockRoot
524+
525+ if columnRoot != blockRoot {
519526 log .WithFields (logrus.Fields {
520527 "peerID" : pid ,
521- "requestedRoot" : fmt .Sprintf ("%#x" , root ),
522- "actualRoot " : fmt .Sprintf ("%#x" , actualRoot ),
528+ "requestedRoot" : fmt .Sprintf ("%#x" , blockRoot ),
529+ "columnRoot " : fmt .Sprintf ("%#x" , columnRoot ),
523530 }).Debug ("Retrieved root does not match requested root" )
524531
525532 return false
@@ -538,25 +545,18 @@ func verifyColumn(
538545 return false
539546 }
540547
541- vf := columnVerifier (roDataColumn , verification .SamplingColumnSidecarRequirements )
542- // Filter out columns which did not pass the KZG inclusion proof verification.
543- if err := vf .SidecarInclusionProven (); err != nil {
544- log .WithFields (logrus.Fields {
545- "peerID" : pid ,
546- "root" : fmt .Sprintf ("%#x" , root ),
547- "index" : retrievedColumn ,
548- }).WithError (err ).Debug ("Failed to verify KZG inclusion proof for retrieved column" )
549- return false
548+ roBlock := blockProcessedData .SignedBlock .Block ()
549+
550+ wrappedBlockDataColumns := []verify.WrappedBlockDataColumn {
551+ {
552+ ROBlock : roBlock ,
553+ RODataColumn : roDataColumn ,
554+ },
550555 }
551556
552- // Filter out columns which did not pass the KZG proof verification.
553- if err := vf .SidecarKzgProofVerified (); err != nil {
554- log .WithFields (logrus.Fields {
555- "peerID" : pid ,
556- "root" : fmt .Sprintf ("%#x" , root ),
557- "index" : retrievedColumn ,
558- }).WithError (err ).Debug ("Failed to verify KZG proof for retrieved column" )
557+ if err := verify .DataColumnsAlignWithBlock (wrappedBlockDataColumns , dataColumnsVerifier ); err != nil {
559558 return false
560559 }
560+
561561 return true
562562}
0 commit comments