Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier};
use types::data_column_sidecar::ColumnIndex;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -1106,23 +1106,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_column_checking_all_caches(
pub fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,
index: ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
if let Some(column) = self
indices: &[ColumnIndex],
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
let columns = if let Some(all_cols) = self
.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })?
.get_data_columns(block_root)?
{
return Ok(Some(column));
}
Ok(Some(all_cols))
} else if let Some(all_cols) = self.early_attester_cache.get_data_columns(block_root) {
Ok(Some(all_cols))
} else {
self.get_data_columns(&block_root)
};

if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) {
return Ok(columns.iter().find(|c| c.index == index).cloned());
if let Ok(Some(columns)) = columns {
return Ok(Some(
columns
.iter()
.filter(|col| indices.contains(&col.index))
.cloned()
.collect(),
));
}

self.get_data_column(&block_root, &index)
columns
}

/// Returns the block at the given root, if any.
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use task_executor::TaskExecutor;
use tracing::{debug, error, info_span, Instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256,
RuntimeVariableList, SignedBeaconBlock,
};

mod error;
Expand Down Expand Up @@ -164,11 +164,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}

/// Get a data column from the availability cache.
pub fn get_data_column(
pub fn get_data_columns(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
self.availability_cache.peek_data_column(data_column_id)
block_root: Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, AvailabilityCheckError> {
self.availability_cache.peek_data_columns(block_root)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use tracing::debug;
use types::blob_sidecar::BlobIdentifier;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock,
};

Expand Down Expand Up @@ -405,16 +405,18 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}

/// Fetch a data column from the cache without affecting the LRU ordering
pub fn peek_data_column(
pub fn peek_data_columns(
&self,
data_column_id: &DataColumnIdentifier,
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) {
Ok(pending_components
.verified_data_columns
.iter()
.find(|data_column| data_column.as_data_column().index == data_column_id.index)
.map(|data_column| data_column.clone_arc()))
block_root: Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, AvailabilityCheckError> {
if let Some(pending_components) = self.critical.read().peek(&block_root) {
Ok(Some(
pending_components
.verified_data_columns
.iter()
.map(|col| col.clone_arc())
.collect(),
))
} else {
Ok(None)
}
Expand Down
36 changes: 20 additions & 16 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256,
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu,
BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext,
ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra,
SignedBeaconBlockFulu,
};
use unsigned_varint::codec::Uvi;

Expand Down Expand Up @@ -596,10 +597,12 @@ fn handle_rpc_request<E: EthSpec>(
))),
SupportedProtocol::DataColumnsByRootV1 => Ok(Some(RequestType::DataColumnsByRoot(
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_data_column_sidecars as usize,
)?,
data_column_ids:
<RuntimeVariableList<DataColumnsByRootIdentifier>>::from_ssz_bytes_with_nested(
decoded_buffer,
spec.max_request_blocks(current_fork),
spec.number_of_columns as usize,
)?,
},
))),
SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping {
Expand Down Expand Up @@ -935,8 +938,8 @@ mod tests {
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use types::{
blob_sidecar::BlobIdentifier, data_column_sidecar::Cell, BeaconBlock, BeaconBlockAltair,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnIdentifier, EmptyBlock,
Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnsByRootIdentifier,
EmptyBlock, Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
SignedBeaconBlockHeader, Slot,
};

Expand Down Expand Up @@ -1066,14 +1069,15 @@ mod tests {
}
}

fn dcbroot_request(spec: &ChainSpec) -> DataColumnsByRootRequest {
fn dcbroot_request(spec: &ChainSpec, fork_name: ForkName) -> DataColumnsByRootRequest {
let number_of_columns = spec.number_of_columns as usize;
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::new(
vec![DataColumnIdentifier {
vec![DataColumnsByRootIdentifier {
block_root: Hash256::zero(),
index: 0,
indices: RuntimeVariableList::from_vec(vec![0, 1, 2], number_of_columns),
}],
spec.max_request_data_column_sidecars as usize,
spec.max_request_blocks(fork_name),
)
.unwrap(),
}
Expand Down Expand Up @@ -1904,7 +1908,6 @@ mod tests {
RequestType::MetaData(MetadataRequest::new_v1()),
RequestType::BlobsByRange(blbrange_request()),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v2()),
];
for req in requests.iter() {
Expand All @@ -1920,6 +1923,7 @@ mod tests {
RequestType::BlobsByRoot(blbroot_request(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v1(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v2(fork_name)),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec, fork_name)),
]
};
for fork_name in ForkName::list_all() {
Expand Down
27 changes: 6 additions & 21 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
Expand All @@ -16,9 +15,10 @@ use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot,
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnsByRootIdentifier, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList,
SignedBeaconBlock, Slot,
};
use types::{ForkContext, ForkName};

Expand Down Expand Up @@ -479,32 +479,17 @@ impl BlobsByRootRequest {
#[derive(Clone, Debug, PartialEq)]
pub struct DataColumnsByRootRequest {
/// The list of beacon block roots and column indices being requested.
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
pub data_column_ids: RuntimeVariableList<DataColumnsByRootIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
pub fn new(data_column_ids: Vec<DataColumnsByRootIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(
data_column_ids,
spec.max_request_data_column_sidecars as usize,
);
Self { data_column_ids }
}

pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
}

pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
for request_id in self.data_column_ids.as_slice() {
column_indexes_by_block
.entry(request_id.block_root)
.or_default()
.push(request_id.index);
}
column_indexes_by_block.into_iter().collect()
}
}

/// Request a number of beacon data columns from a peer.
Expand Down
28 changes: 15 additions & 13 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,24 +360,26 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), (RpcErrorResponse, &'static str)> {
let mut send_data_column_count = 0;

for data_column_id in request.data_column_ids.as_slice() {
match self.chain.get_data_column_checking_all_caches(
data_column_id.block_root,
data_column_id.index,
for data_column_ids_by_root in request.data_column_ids.as_slice() {
match self.chain.get_data_columns_checking_all_caches(
data_column_ids_by_root.block_root,
&Vec::from(data_column_ids_by_root.indices.clone()),
) {
Ok(Some(data_column)) => {
send_data_column_count += 1;
self.send_response(
peer_id,
inbound_request_id,
Response::DataColumnsByRoot(Some(data_column)),
);
Ok(Some(data_columns)) => {
send_data_column_count += data_columns.len();
for data_column in data_columns {
self.send_response(
peer_id,
inbound_request_id,
Response::DataColumnsByRoot(Some(data_column)),
);
}
}
Ok(None) => {} // no-op
Err(e) => {
// TODO(das): lower log level when feature is stabilized
error!(
block_root = ?data_column_id.block_root,
block_root = ?data_column_ids_by_root.block_root,
%peer_id,
error = ?e,
"Error getting data column"
Expand All @@ -389,7 +391,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

debug!(
%peer_id,
request = ?request.group_by_ordered_block_root(),
request = ?request.data_column_ids,
returned = send_data_column_count,
"Received DataColumnsByRoot Request"
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use lighthouse_network::rpc::methods::DataColumnsByRootRequest;
use std::sync::Arc;
use types::{ChainSpec, DataColumnIdentifier, DataColumnSidecar, EthSpec, Hash256};
use types::{
ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, Hash256,
RuntimeVariableList,
};

use super::{ActiveRequestItems, LookupVerifyError};

Expand All @@ -12,14 +15,16 @@ pub struct DataColumnsByRootSingleBlockRequest {

impl DataColumnsByRootSingleBlockRequest {
pub fn into_request(self, spec: &ChainSpec) -> DataColumnsByRootRequest {
let number_of_columns = spec.number_of_columns as usize;
// TODO we aren't handling the case where self.indices > NUMBER_OF_COLUMNS defined by the
// spec. Do we do this else where? I think we shall use RuntimeVariableList::new() and
// handle errors.
let indices = RuntimeVariableList::from_vec(self.indices, number_of_columns);
DataColumnsByRootRequest::new(
self.indices
.into_iter()
.map(|index| DataColumnIdentifier {
block_root: self.block_root,
index,
})
.collect(),
vec![DataColumnsByRootIdentifier {
block_root: self.block_root,
indices,
}],
spec,
)
}
Expand Down
15 changes: 5 additions & 10 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,18 +976,13 @@ impl TestRig {
request: RequestType::DataColumnsByRoot(request),
app_request_id:
AppRequestId::Sync(id @ SyncRequestId::DataColumnsByRoot { .. }),
} if request
.data_column_ids
.to_vec()
.iter()
.any(|r| r.block_root == block_root) =>
{
let indices = request
} => {
let matching = request
.data_column_ids
.to_vec()
.iter()
.map(|cid| cid.index)
.collect::<Vec<_>>();
.find(|id| id.block_root == block_root)?;

let indices = matching.indices.iter().copied().collect();
Some((*id, indices))
}
_ => None,
Expand Down
Loading