@@ -9,11 +9,14 @@ use crate::{
99 sync:: { manager:: BlockProcessType , SyncMessage } ,
1010} ;
1111use beacon_chain:: block_verification_types:: RpcBlock ;
12+ use beacon_chain:: kzg_utils:: blobs_to_data_column_sidecars;
1213use beacon_chain:: test_utils:: {
13- test_spec, AttestationStrategy , BeaconChainHarness , BlockStrategy , EphemeralHarnessType ,
14+ get_kzg, test_spec, AttestationStrategy , BeaconChainHarness , BlockStrategy ,
15+ EphemeralHarnessType ,
1416} ;
1517use beacon_chain:: { BeaconChain , WhenSlotSkipped } ;
1618use beacon_processor:: { work_reprocessing_queue:: * , * } ;
19+ use itertools:: Itertools ;
1720use lighthouse_network:: rpc:: methods:: { BlobsByRangeRequest , MetaDataV3 } ;
1821use lighthouse_network:: rpc:: InboundRequestId ;
1922use lighthouse_network:: {
@@ -29,9 +32,9 @@ use std::time::Duration;
2932use tokio:: sync:: mpsc;
3033use types:: blob_sidecar:: FixedBlobSidecarList ;
3134use types:: {
32- Attestation , AttesterSlashing , BlobSidecar , BlobSidecarList , Epoch , Hash256 , MainnetEthSpec ,
33- ProposerSlashing , SignedAggregateAndProof , SignedBeaconBlock , SignedVoluntaryExit , Slot ,
34- SubnetId ,
35+ Attestation , AttesterSlashing , BlobSidecar , BlobSidecarList , DataColumnSidecarList ,
36+ DataColumnSubnetId , Epoch , Hash256 , MainnetEthSpec , ProposerSlashing , SignedAggregateAndProof ,
37+ SignedBeaconBlock , SignedVoluntaryExit , Slot , SubnetId ,
3538} ;
3639
3740type E = MainnetEthSpec ;
@@ -52,6 +55,7 @@ struct TestRig {
5255 chain : Arc < BeaconChain < T > > ,
5356 next_block : Arc < SignedBeaconBlock < E > > ,
5457 next_blobs : Option < BlobSidecarList < E > > ,
58+ next_data_columns : Option < DataColumnSidecarList < E > > ,
5559 attestations : Vec < ( Attestation < E > , SubnetId ) > ,
5660 next_block_attestations : Vec < ( Attestation < E > , SubnetId ) > ,
5761 next_block_aggregate_attestations : Vec < SignedAggregateAndProof < E > > ,
@@ -241,7 +245,7 @@ impl TestRig {
241245 let network_beacon_processor = Arc :: new ( network_beacon_processor) ;
242246
243247 let beacon_processor = BeaconProcessor {
244- network_globals,
248+ network_globals : network_globals . clone ( ) ,
245249 executor,
246250 current_workers : 0 ,
247251 config : beacon_processor_config,
@@ -262,15 +266,36 @@ impl TestRig {
262266
263267 assert ! ( beacon_processor. is_ok( ) ) ;
264268 let block = next_block_tuple. 0 ;
265- let blob_sidecars = if let Some ( ( kzg_proofs, blobs) ) = next_block_tuple. 1 {
266- Some ( BlobSidecar :: build_sidecars ( blobs, & block, kzg_proofs, & chain. spec ) . unwrap ( ) )
269+ let ( blob_sidecars, data_columns) = if let Some ( ( kzg_proofs, blobs) ) = next_block_tuple. 1 {
270+ if chain. spec . is_peer_das_enabled_for_epoch ( block. epoch ( ) ) {
271+ let kzg = get_kzg ( & chain. spec ) ;
272+ let custody_columns: DataColumnSidecarList < E > = blobs_to_data_column_sidecars (
273+ & blobs. iter ( ) . collect_vec ( ) ,
274+ kzg_proofs. clone ( ) . into_iter ( ) . collect_vec ( ) ,
275+ & block,
276+ & kzg,
277+ & chain. spec ,
278+ )
279+ . unwrap ( )
280+ . into_iter ( )
281+ . filter ( |c| network_globals. sampling_columns . contains ( & c. index ) )
282+ . collect :: < Vec < _ > > ( ) ;
283+
284+ ( None , Some ( custody_columns) )
285+ } else {
286+ let blob_sidecars =
287+ BlobSidecar :: build_sidecars ( blobs, & block, kzg_proofs, & chain. spec ) . unwrap ( ) ;
288+ ( Some ( blob_sidecars) , None )
289+ }
267290 } else {
268- None
291+ ( None , None )
269292 } ;
293+
270294 Self {
271295 chain,
272296 next_block : block,
273297 next_blobs : blob_sidecars,
298+ next_data_columns : data_columns,
274299 attestations,
275300 next_block_attestations,
276301 next_block_aggregate_attestations,
@@ -323,6 +348,22 @@ impl TestRig {
323348 }
324349 }
325350
351+ pub fn enqueue_gossip_data_columns ( & self , col_index : usize ) {
352+ if let Some ( data_columns) = self . next_data_columns . as_ref ( ) {
353+ let data_column = data_columns. get ( col_index) . unwrap ( ) ;
354+ self . network_beacon_processor
355+ . send_gossip_data_column_sidecar (
356+ junk_message_id ( ) ,
357+ junk_peer_id ( ) ,
358+ Client :: default ( ) ,
359+ DataColumnSubnetId :: from_column_index ( data_column. index , & self . chain . spec ) ,
360+ data_column. clone ( ) ,
361+ Duration :: from_secs ( 0 ) ,
362+ )
363+ . unwrap ( ) ;
364+ }
365+ }
366+
326367 pub fn custody_columns_count ( & self ) -> usize {
327368 self . network_beacon_processor
328369 . network_globals
@@ -375,6 +416,19 @@ impl TestRig {
375416 }
376417 }
377418
419+ pub fn enqueue_single_lookup_rpc_data_columns ( & self ) {
420+ if let Some ( data_columns) = self . next_data_columns . clone ( ) {
421+ self . network_beacon_processor
422+ . send_rpc_custody_columns (
423+ self . next_block . canonical_root ( ) ,
424+ data_columns,
425+ Duration :: default ( ) ,
426+ BlockProcessType :: SingleCustodyColumn ( 1 ) ,
427+ )
428+ . unwrap ( ) ;
429+ }
430+ }
431+
378432 pub fn enqueue_blobs_by_range_request ( & self , count : u64 ) {
379433 self . network_beacon_processor
380434 . send_blobs_by_range_request (
@@ -632,6 +686,13 @@ async fn import_gossip_block_acceptably_early() {
632686 . await ;
633687 }
634688
689+ let num_data_columns = rig. next_data_columns . as_ref ( ) . map ( |c| c. len ( ) ) . unwrap_or ( 0 ) ;
690+ for i in 0 ..num_data_columns {
691+ rig. enqueue_gossip_data_columns ( i) ;
692+ rig. assert_event_journal_completes ( & [ WorkType :: GossipDataColumnSidecar ] )
693+ . await ;
694+ }
695+
635696 // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
636697 // and check the head in the time between the block arrived early and when its due for
637698 // processing.
@@ -708,19 +769,20 @@ async fn import_gossip_block_at_current_slot() {
708769 rig. assert_event_journal_completes ( & [ WorkType :: GossipBlock ] )
709770 . await ;
710771
711- let num_blobs = rig
712- . next_blobs
713- . as_ref ( )
714- . map ( |blobs| blobs. len ( ) )
715- . unwrap_or ( 0 ) ;
716-
772+ let num_blobs = rig. next_blobs . as_ref ( ) . map ( |b| b. len ( ) ) . unwrap_or ( 0 ) ;
717773 for i in 0 ..num_blobs {
718774 rig. enqueue_gossip_blob ( i) ;
719-
720775 rig. assert_event_journal_completes ( & [ WorkType :: GossipBlobSidecar ] )
721776 . await ;
722777 }
723778
779+ let num_data_columns = rig. next_data_columns . as_ref ( ) . map ( |c| c. len ( ) ) . unwrap_or ( 0 ) ;
780+ for i in 0 ..num_data_columns {
781+ rig. enqueue_gossip_data_columns ( i) ;
782+ rig. assert_event_journal_completes ( & [ WorkType :: GossipDataColumnSidecar ] )
783+ . await ;
784+ }
785+
724786 assert_eq ! (
725787 rig. head_root( ) ,
726788 rig. next_block. canonical_root( ) ,
@@ -773,11 +835,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
773835 ) ;
774836
775837 // Send the block and ensure that the attestation is received back and imported.
776- let num_blobs = rig
777- . next_blobs
778- . as_ref ( )
779- . map ( |blobs| blobs. len ( ) )
780- . unwrap_or ( 0 ) ;
838+ let num_blobs = rig. next_blobs . as_ref ( ) . map ( |b| b. len ( ) ) . unwrap_or ( 0 ) ;
839+ let num_data_columns = rig. next_data_columns . as_ref ( ) . map ( |c| c. len ( ) ) . unwrap_or ( 0 ) ;
781840 let mut events = vec ! [ ] ;
782841 match import_method {
783842 BlockImportMethod :: Gossip => {
@@ -787,6 +846,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
787846 rig. enqueue_gossip_blob ( i) ;
788847 events. push ( WorkType :: GossipBlobSidecar ) ;
789848 }
849+ for i in 0 ..num_data_columns {
850+ rig. enqueue_gossip_data_columns ( i) ;
851+ events. push ( WorkType :: GossipDataColumnSidecar ) ;
852+ }
790853 }
791854 BlockImportMethod :: Rpc => {
792855 rig. enqueue_rpc_block ( ) ;
@@ -795,6 +858,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
795858 rig. enqueue_single_lookup_rpc_blobs ( ) ;
796859 events. push ( WorkType :: RpcBlobs ) ;
797860 }
861+ if num_data_columns > 0 {
862+ rig. enqueue_single_lookup_rpc_data_columns ( ) ;
863+ events. push ( WorkType :: RpcCustodyColumn ) ;
864+ }
798865 }
799866 } ;
800867
@@ -854,11 +921,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
854921 ) ;
855922
856923 // Send the block and ensure that the attestation is received back and imported.
857- let num_blobs = rig
858- . next_blobs
859- . as_ref ( )
860- . map ( |blobs| blobs. len ( ) )
861- . unwrap_or ( 0 ) ;
924+ let num_blobs = rig. next_blobs . as_ref ( ) . map ( |b| b. len ( ) ) . unwrap_or ( 0 ) ;
925+ let num_data_columns = rig. next_data_columns . as_ref ( ) . map ( |c| c. len ( ) ) . unwrap_or ( 0 ) ;
862926 let mut events = vec ! [ ] ;
863927 match import_method {
864928 BlockImportMethod :: Gossip => {
@@ -868,6 +932,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
868932 rig. enqueue_gossip_blob ( i) ;
869933 events. push ( WorkType :: GossipBlobSidecar ) ;
870934 }
935+ for i in 0 ..num_data_columns {
936+ rig. enqueue_gossip_data_columns ( i) ;
937+ events. push ( WorkType :: GossipDataColumnSidecar )
938+ }
871939 }
872940 BlockImportMethod :: Rpc => {
873941 rig. enqueue_rpc_block ( ) ;
@@ -876,6 +944,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
876944 rig. enqueue_single_lookup_rpc_blobs ( ) ;
877945 events. push ( WorkType :: RpcBlobs ) ;
878946 }
947+ if num_data_columns > 0 {
948+ rig. enqueue_single_lookup_rpc_data_columns ( ) ;
949+ events. push ( WorkType :: RpcCustodyColumn ) ;
950+ }
879951 }
880952 } ;
881953
@@ -1060,12 +1132,20 @@ async fn test_rpc_block_reprocessing() {
10601132 rig. assert_event_journal_completes ( & [ WorkType :: RpcBlock ] )
10611133 . await ;
10621134
1063- rig. enqueue_single_lookup_rpc_blobs ( ) ;
1064- if rig. next_blobs . as_ref ( ) . map ( |b| b. len ( ) ) . unwrap_or ( 0 ) > 0 {
1135+ let num_blobs = rig. next_blobs . as_ref ( ) . map ( |b| b. len ( ) ) . unwrap_or ( 0 ) ;
1136+ if num_blobs > 0 {
1137+ rig. enqueue_single_lookup_rpc_blobs ( ) ;
10651138 rig. assert_event_journal_completes ( & [ WorkType :: RpcBlobs ] )
10661139 . await ;
10671140 }
10681141
1142+ let num_data_columns = rig. next_data_columns . as_ref ( ) . map ( |c| c. len ( ) ) . unwrap_or ( 0 ) ;
1143+ if num_data_columns > 0 {
1144+ rig. enqueue_single_lookup_rpc_data_columns ( ) ;
1145+ rig. assert_event_journal_completes ( & [ WorkType :: RpcCustodyColumn ] )
1146+ . await ;
1147+ }
1148+
10691149 // next_block shouldn't be processed since it couldn't get the
10701150 // duplicate cache handle
10711151 assert_ne ! ( next_block_root, rig. head_root( ) ) ;
0 commit comments