Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier};
use types::data_column_sidecar::ColumnIndex;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -1106,23 +1106,52 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_column_checking_all_caches(
pub fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,
index: ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
if let Some(column) = self
indices: &[ColumnIndex],
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
if indices.is_empty() {
return Ok(None);
}

let mut columns = vec![];
let mut indices: HashSet<ColumnIndex> = indices.iter().copied().collect();

if let Some(all_cols) = self
.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })?
.get_data_columns(block_root)?
{
return Ok(Some(column));
let filtered = all_cols
.into_iter()
.filter(|col| indices.remove(&col.index))
.collect::<Vec<_>>();
columns.extend(filtered);

if indices.is_empty() {
return Ok(Some(columns));
}
}
if let Some(all_cols) = self.early_attester_cache.get_data_columns(block_root) {
let filtered = all_cols
.into_iter()
.filter(|col| indices.remove(&col.index))
.collect::<Vec<_>>();
columns.extend(filtered);

if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) {
return Ok(columns.iter().find(|c| c.index == index).cloned());
if indices.is_empty() {
return Ok(Some(columns));
}
}
if let Some(filtered) = self.get_data_columns(&block_root, Some(&mut indices))? {
columns.extend(filtered)
};

self.get_data_column(&block_root, &index)
if columns.is_empty() {
Ok(None)
} else {
Ok(Some(columns))
}
}

/// Returns the block at the given root, if any.
Expand Down Expand Up @@ -1206,8 +1235,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_data_columns(
&self,
block_root: &Hash256,
indices: Option<&mut HashSet<ColumnIndex>>,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
self.store.get_data_columns(block_root).map_err(Error::from)
self.store
.get_data_columns(block_root, indices)
.map_err(Error::from)
}

/// Returns the blobs at the given root, if any.
Expand All @@ -1228,7 +1260,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
if let Some(columns) = self.store.get_data_columns(block_root)? {
if let Some(columns) = self.store.get_data_columns(block_root, None)? {
let num_required_columns = self.spec.number_of_columns / 2;
let reconstruction_possible = columns.len() >= num_required_columns as usize;
if reconstruction_possible {
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use task_executor::TaskExecutor;
use tracing::{debug, error, info_span, Instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlock,
};

mod error;
Expand Down Expand Up @@ -164,11 +164,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}

/// Get a data column from the availability cache.
pub fn get_data_column(
pub fn get_data_columns(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
self.availability_cache.peek_data_column(data_column_id)
block_root: Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, AvailabilityCheckError> {
self.availability_cache.peek_data_columns(block_root)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use tracing::debug;
use types::blob_sidecar::BlobIdentifier;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock,
};

Expand Down Expand Up @@ -404,17 +404,19 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

/// Fetch a data column from the cache without affecting the LRU ordering
pub fn peek_data_column(
/// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering
pub fn peek_data_columns(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) {
Ok(pending_components
.verified_data_columns
.iter()
.find(|data_column| data_column.as_data_column().index == data_column_id.index)
.map(|data_column| data_column.clone_arc()))
block_root: Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&block_root) {
Ok(Some(
pending_components
.verified_data_columns
.iter()
.map(|col| col.clone_arc())
.collect(),
))
} else {
Ok(None)
}
Expand Down
11 changes: 2 additions & 9 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::iter;
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::debug;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier};
use types::data_column_sidecar::ColumnIndex;
use types::{
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -200,13 +200,6 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
)
}

pub fn id(&self) -> DataColumnIdentifier {
DataColumnIdentifier {
block_root: self.block_root,
index: self.data_column.index(),
}
}

pub fn as_data_column(&self) -> &DataColumnSidecar<T::EthSpec> {
self.data_column.as_data_column()
}
Expand Down Expand Up @@ -741,7 +734,7 @@ pub fn observe_gossip_data_column<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<(), GossipDataColumnError> {
// Now the signature is valid, store the proposal so we don't accept another data column sidecar
// with the same `DataColumnIdentifier`. It's important to double-check that the proposer still
// with the same `ColumnIndex`. It's important to double-check that the proposer still
// hasn't been observed so we don't have a race-condition when verifying two blocks
// simultaneously.
//
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2371,7 +2371,11 @@ where

// Blobs are stored as data columns from Fulu (PeerDAS)
if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap();
let columns = self
.chain
.get_data_columns(&block_root, None)
.unwrap()
.unwrap();
let custody_columns = columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars
let data_sidecars = if harness.spec.is_peer_das_enabled_for_epoch(block_epoch) {
harness
.chain
.get_data_columns(&snapshot.beacon_block_root)
.get_data_columns(&snapshot.beacon_block_root, None)
.unwrap()
.map(|columns| {
columns
Expand Down
36 changes: 20 additions & 16 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256,
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu,
BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext,
ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra,
SignedBeaconBlockFulu,
};
use unsigned_varint::codec::Uvi;

Expand Down Expand Up @@ -596,10 +597,12 @@ fn handle_rpc_request<E: EthSpec>(
))),
SupportedProtocol::DataColumnsByRootV1 => Ok(Some(RequestType::DataColumnsByRoot(
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_data_column_sidecars as usize,
)?,
data_column_ids:
<RuntimeVariableList<DataColumnsByRootIdentifier>>::from_ssz_bytes_with_nested(
decoded_buffer,
spec.max_request_blocks(current_fork),
spec.number_of_columns as usize,
)?,
},
))),
SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping {
Expand Down Expand Up @@ -935,8 +938,8 @@ mod tests {
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use types::{
blob_sidecar::BlobIdentifier, data_column_sidecar::Cell, BeaconBlock, BeaconBlockAltair,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnIdentifier, EmptyBlock,
Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnsByRootIdentifier,
EmptyBlock, Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
SignedBeaconBlockHeader, Slot,
};

Expand Down Expand Up @@ -1066,14 +1069,15 @@ mod tests {
}
}

fn dcbroot_request(spec: &ChainSpec) -> DataColumnsByRootRequest {
fn dcbroot_request(spec: &ChainSpec, fork_name: ForkName) -> DataColumnsByRootRequest {
let number_of_columns = spec.number_of_columns as usize;
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::new(
vec![DataColumnIdentifier {
vec![DataColumnsByRootIdentifier {
block_root: Hash256::zero(),
index: 0,
columns: RuntimeVariableList::from_vec(vec![0, 1, 2], number_of_columns),
}],
spec.max_request_data_column_sidecars as usize,
spec.max_request_blocks(fork_name),
)
.unwrap(),
}
Expand Down Expand Up @@ -1904,7 +1908,6 @@ mod tests {
RequestType::MetaData(MetadataRequest::new_v1()),
RequestType::BlobsByRange(blbrange_request()),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v2()),
];
for req in requests.iter() {
Expand All @@ -1920,6 +1923,7 @@ mod tests {
RequestType::BlobsByRoot(blbroot_request(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v1(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v2(fork_name)),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec, fork_name)),
]
};
for fork_name in ForkName::list_all() {
Expand Down
29 changes: 7 additions & 22 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
Expand All @@ -16,9 +15,10 @@ use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot,
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnsByRootIdentifier, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList,
SignedBeaconBlock, Slot,
};
use types::{ForkContext, ForkName};

Expand Down Expand Up @@ -479,32 +479,17 @@ impl BlobsByRootRequest {
#[derive(Clone, Debug, PartialEq)]
pub struct DataColumnsByRootRequest {
/// The list of beacon block roots and column indices being requested.
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
pub data_column_ids: RuntimeVariableList<DataColumnsByRootIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
pub fn new(data_column_ids: Vec<DataColumnsByRootIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(
data_column_ids,
spec.max_request_data_column_sidecars as usize,
spec.max_request_blocks(ForkName::Deneb),
);
Self { data_column_ids }
}

pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
}

pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
for request_id in self.data_column_ids.as_slice() {
column_indexes_by_block
.entry(request_id.block_root)
.or_default()
.push(request_id.index);
}
column_indexes_by_block.into_iter().collect()
}
}

/// Request a number of beacon data columns from a peer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let processing_start_time = Instant::now();
let block_root = verified_data_column.block_root();
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;
let data_column_index = verified_data_column.index();

let result = self
.chain
Expand Down
Loading