Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 106 additions & 27 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use crate::{
sync::{manager::BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
EphemeralHarnessType,
};
use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use itertools::Itertools;
use lighthouse_network::discovery::ConnectionId;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::{RequestId, SubstreamId};
Expand All @@ -30,9 +33,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot,
SubnetId,
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
};

type E = MainnetEthSpec;
Expand All @@ -53,6 +56,7 @@ struct TestRig {
chain: Arc<BeaconChain<T>>,
next_block: Arc<SignedBeaconBlock<E>>,
next_blobs: Option<BlobSidecarList<E>>,
next_data_columns: Option<DataColumnSidecarList<E>>,
attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_aggregate_attestations: Vec<SignedAggregateAndProof<E>>,
Expand Down Expand Up @@ -242,7 +246,7 @@ impl TestRig {
let network_beacon_processor = Arc::new(network_beacon_processor);

let beacon_processor = BeaconProcessor {
network_globals,
network_globals: network_globals.clone(),
executor,
current_workers: 0,
config: beacon_processor_config,
Expand All @@ -263,15 +267,35 @@ impl TestRig {

assert!(beacon_processor.is_ok());
let block = next_block_tuple.0;
let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap())
let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let kzg = get_kzg(&chain.spec);
let custody_columns: DataColumnSidecarList<E> = blobs_to_data_column_sidecars(
&blobs.iter().collect_vec(),
&block,
&kzg,
&chain.spec,
)
.unwrap()
.into_iter()
.filter(|c| network_globals.sampling_columns.contains(&c.index))
.collect::<Vec<_>>();

(None, Some(custody_columns))
} else {
let blob_sidecars =
BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap();
(Some(blob_sidecars), None)
}
} else {
None
(None, None)
};

Self {
chain,
next_block: block,
next_blobs: blob_sidecars,
next_data_columns: data_columns,
attestations,
next_block_attestations,
next_block_aggregate_attestations,
Expand Down Expand Up @@ -324,6 +348,22 @@ impl TestRig {
}
}

pub fn enqueue_gossip_data_columns(&self, col_index: usize) {
if let Some(data_columns) = self.next_data_columns.as_ref() {
let data_column = data_columns.get(col_index).unwrap();
self.network_beacon_processor
.send_gossip_data_column_sidecar(
junk_message_id(),
junk_peer_id(),
Client::default(),
DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
)
.unwrap();
}
}

pub fn enqueue_rpc_block(&self) {
let block_root = self.next_block.canonical_root();
self.network_beacon_processor
Expand Down Expand Up @@ -362,6 +402,19 @@ impl TestRig {
}
}

pub fn enqueue_single_lookup_rpc_data_columns(&self) {
if let Some(data_columns) = self.next_data_columns.clone() {
self.network_beacon_processor
.send_rpc_custody_columns(
self.next_block.canonical_root(),
data_columns,
Duration::default(),
BlockProcessType::SingleCustodyColumn(1),
)
.unwrap();
}
}

pub fn enqueue_blobs_by_range_request(&self, count: u64) {
self.network_beacon_processor
.send_blobs_by_range_request(
Expand Down Expand Up @@ -621,6 +674,13 @@ async fn import_gossip_block_acceptably_early() {
.await;
}

let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}

// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
// and check the head in the time between the block arrived early and when its due for
// processing.
Expand Down Expand Up @@ -697,19 +757,20 @@ async fn import_gossip_block_at_current_slot() {
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;

let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);

let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);

rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar])
.await;
}

let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}

assert_eq!(
rig.head_root(),
rig.next_block.canonical_root(),
Expand Down Expand Up @@ -762,11 +823,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
);

// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
BlockImportMethod::Gossip => {
Expand All @@ -776,6 +834,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar);
}
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
Expand All @@ -784,6 +846,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
}
};

Expand Down Expand Up @@ -843,11 +909,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
);

// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
BlockImportMethod::Gossip => {
Expand All @@ -857,6 +920,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar)
}
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
Expand All @@ -865,6 +932,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
}
};

Expand Down Expand Up @@ -1049,12 +1120,20 @@ async fn test_rpc_block_reprocessing() {
rig.assert_event_journal_completes(&[WorkType::RpcBlock])
.await;

rig.enqueue_single_lookup_rpc_blobs();
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
rig.assert_event_journal_completes(&[WorkType::RpcBlobs])
.await;
}

let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn])
.await;
}

// next_block shouldn't be processed since it couldn't get the
// duplicate cache handle
assert_ne!(next_block_root, rig.head_root());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256

# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ BLOB_SIDECAR_SUBNET_COUNT: 6
# `uint64(6)`
MAX_BLOBS_PER_BLOCK: 6

# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152

# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ BLOB_SIDECAR_SUBNET_COUNT: 6
# `uint64(6)`
MAX_BLOBS_PER_BLOCK: 6

# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9
# MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA
MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152

# DAS
# Fulu
NUMBER_OF_COLUMNS: 128
NUMBER_OF_CUSTODY_GROUPS: 128
DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128
SAMPLES_PER_SLOT: 8
CUSTODY_REQUIREMENT: 4
MAX_BLOBS_PER_BLOCK_FULU: 12
Loading
Loading