Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier};
use types::data_column_sidecar::ColumnIndex;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -1106,23 +1106,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_column_checking_all_caches(
pub fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,
index: ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
if let Some(column) = self
indices: &[ColumnIndex],
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
let columns = if let Some(all_cols) = self
.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })?
.get_data_columns(block_root)?
{
return Ok(Some(column));
}
Some(all_cols)
} else if let Some(all_cols) = self.early_attester_cache.get_data_columns(block_root) {
Some(all_cols)
} else {
self.get_data_columns(&block_root)?
};

if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) {
return Ok(columns.iter().find(|c| c.index == index).cloned());
let indices: HashSet<_> = indices.iter().clone().collect();
if let Some(columns) = columns {
return Ok(Some(
columns
.into_iter()
.filter(|col| indices.contains(&col.index))
.collect(),
));
}

self.get_data_column(&block_root, &index)
Ok(columns)
}

/// Returns the block at the given root, if any.
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use task_executor::TaskExecutor;
use tracing::{debug, error, info_span, Instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlock,
};

mod error;
Expand Down Expand Up @@ -164,11 +164,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}

/// Get a data column from the availability cache.
pub fn get_data_column(
pub fn get_data_columns(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
self.availability_cache.peek_data_column(data_column_id)
block_root: Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, AvailabilityCheckError> {
self.availability_cache.peek_data_columns(block_root)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use tracing::debug;
use types::blob_sidecar::BlobIdentifier;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock,
};

Expand Down Expand Up @@ -404,17 +404,19 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

/// Fetch a data column from the cache without affecting the LRU ordering
pub fn peek_data_column(
/// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering
pub fn peek_data_columns(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) {
Ok(pending_components
.verified_data_columns
.iter()
.find(|data_column| data_column.as_data_column().index == data_column_id.index)
.map(|data_column| data_column.clone_arc()))
block_root: Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&block_root) {
Ok(Some(
pending_components
.verified_data_columns
.iter()
.map(|col| col.clone_arc())
.collect(),
))
} else {
Ok(None)
}
Expand Down
11 changes: 4 additions & 7 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::iter;
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::debug;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier};
use types::data_column_sidecar::ColumnIndex;
use types::{
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -201,11 +201,8 @@ impl<T: BeaconChainTypes, O: ObservationStrategy> GossipVerifiedDataColumn<T, O>
)
}

pub fn id(&self) -> DataColumnIdentifier {
DataColumnIdentifier {
block_root: self.block_root,
index: self.data_column.index(),
}
pub fn id(&self) -> ColumnIndex {
self.data_column.index()
}

pub fn as_data_column(&self) -> &DataColumnSidecar<T::EthSpec> {
Expand Down Expand Up @@ -742,7 +739,7 @@ pub fn observe_gossip_data_column<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<(), GossipDataColumnError> {
// Now the signature is valid, store the proposal so we don't accept another data column sidecar
// with the same `DataColumnIdentifier`. It's important to double-check that the proposer still
// with the same `ColumnIndex`. It's important to double-check that the proposer still
// hasn't been observed so we don't have a race-condition when verifying two blocks
// simultaneously.
//
Expand Down
36 changes: 20 additions & 16 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256,
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu,
BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext,
ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra,
SignedBeaconBlockFulu,
};
use unsigned_varint::codec::Uvi;

Expand Down Expand Up @@ -596,10 +597,12 @@ fn handle_rpc_request<E: EthSpec>(
))),
SupportedProtocol::DataColumnsByRootV1 => Ok(Some(RequestType::DataColumnsByRoot(
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_data_column_sidecars as usize,
)?,
data_column_ids:
<RuntimeVariableList<DataColumnsByRootIdentifier>>::from_ssz_bytes_with_nested(
decoded_buffer,
spec.max_request_blocks(current_fork),
spec.number_of_columns as usize,
)?,
},
))),
SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping {
Expand Down Expand Up @@ -935,8 +938,8 @@ mod tests {
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use types::{
blob_sidecar::BlobIdentifier, data_column_sidecar::Cell, BeaconBlock, BeaconBlockAltair,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnIdentifier, EmptyBlock,
Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnsByRootIdentifier,
EmptyBlock, Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
SignedBeaconBlockHeader, Slot,
};

Expand Down Expand Up @@ -1066,14 +1069,15 @@ mod tests {
}
}

fn dcbroot_request(spec: &ChainSpec) -> DataColumnsByRootRequest {
fn dcbroot_request(spec: &ChainSpec, fork_name: ForkName) -> DataColumnsByRootRequest {
let number_of_columns = spec.number_of_columns as usize;
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::new(
vec![DataColumnIdentifier {
vec![DataColumnsByRootIdentifier {
block_root: Hash256::zero(),
index: 0,
indices: RuntimeVariableList::from_vec(vec![0, 1, 2], number_of_columns),
}],
spec.max_request_data_column_sidecars as usize,
spec.max_request_blocks(fork_name),
)
.unwrap(),
}
Expand Down Expand Up @@ -1904,7 +1908,6 @@ mod tests {
RequestType::MetaData(MetadataRequest::new_v1()),
RequestType::BlobsByRange(blbrange_request()),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v2()),
];
for req in requests.iter() {
Expand All @@ -1920,6 +1923,7 @@ mod tests {
RequestType::BlobsByRoot(blbroot_request(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v1(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v2(fork_name)),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec, fork_name)),
]
};
for fork_name in ForkName::list_all() {
Expand Down
29 changes: 7 additions & 22 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
Expand All @@ -16,9 +15,10 @@ use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot,
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnsByRootIdentifier, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList,
SignedBeaconBlock, Slot,
};
use types::{ForkContext, ForkName};

Expand Down Expand Up @@ -479,32 +479,17 @@ impl BlobsByRootRequest {
#[derive(Clone, Debug, PartialEq)]
pub struct DataColumnsByRootRequest {
/// The list of beacon block roots and column indices being requested.
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
pub data_column_ids: RuntimeVariableList<DataColumnsByRootIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
pub fn new(data_column_ids: Vec<DataColumnsByRootIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(
data_column_ids,
spec.max_request_data_column_sidecars as usize,
spec.max_request_blocks(ForkName::Deneb),
);
Self { data_column_ids }
}

pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
}

pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
for request_id in self.data_column_ids.as_slice() {
column_indexes_by_block
.entry(request_id.block_root)
.or_default()
.push(request_id.index);
}
column_indexes_by_block.into_iter().collect()
}
}

/// Request a number of beacon data columns from a peer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let processing_start_time = Instant::now();
let block_root = verified_data_column.block_root();
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;
let data_column_index = verified_data_column.id();

let result = self
.chain
Expand Down
28 changes: 15 additions & 13 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,24 +360,26 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), (RpcErrorResponse, &'static str)> {
let mut send_data_column_count = 0;

for data_column_id in request.data_column_ids.as_slice() {
match self.chain.get_data_column_checking_all_caches(
data_column_id.block_root,
data_column_id.index,
for data_column_ids_by_root in request.data_column_ids.as_slice() {
match self.chain.get_data_columns_checking_all_caches(
data_column_ids_by_root.block_root,
data_column_ids_by_root.indices.as_slice(),
) {
Ok(Some(data_column)) => {
send_data_column_count += 1;
self.send_response(
peer_id,
inbound_request_id,
Response::DataColumnsByRoot(Some(data_column)),
);
Ok(Some(data_columns)) => {
send_data_column_count += data_columns.len();
for data_column in data_columns {
self.send_response(
peer_id,
inbound_request_id,
Response::DataColumnsByRoot(Some(data_column)),
);
}
}
Ok(None) => {} // no-op
Err(e) => {
// TODO(das): lower log level when feature is stabilized
error!(
block_root = ?data_column_id.block_root,
block_root = ?data_column_ids_by_root.block_root,
%peer_id,
error = ?e,
"Error getting data column"
Expand All @@ -389,7 +391,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

debug!(
%peer_id,
request = ?request.group_by_ordered_block_root(),
request = ?request.data_column_ids,
returned = send_data_column_count,
"Received DataColumnsByRoot Request"
);
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
request: RequestType::DataColumnsByRoot(
request.clone().try_into_request(&self.chain.spec)?,
),
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
})?;

Expand Down
Loading