Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions beacon_node/beacon_chain/src/kzg_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ pub(crate) fn build_data_column_sidecars<E: EthSpec>(
///
/// If `blob_indices_opt` is `None`, this function attempts to reconstruct all blobs associated
/// with the block.
/// This function does NOT use rayon as this is primarily used by a non critical path in HTTP API
/// and it will be slow if the node needs to reconstruct the blobs
pub fn reconstruct_blobs<E: EthSpec>(
kzg: &Kzg,
data_columns: &[Arc<DataColumnSidecar<E>>],
Expand All @@ -320,7 +322,7 @@ pub fn reconstruct_blobs<E: EthSpec>(
};

let blob_sidecars = blob_indices
.into_par_iter()
.into_iter()
.map(|row_index| {
let mut cells: Vec<KzgCellRef> = vec![];
let mut cell_ids: Vec<u64> = vec![];
Expand All @@ -337,16 +339,26 @@ pub fn reconstruct_blobs<E: EthSpec>(
cell_ids.push(data_column.index);
}

let (cells, _kzg_proofs) = kzg
.recover_cells_and_compute_kzg_proofs(&cell_ids, &cells)
.map_err(|e| format!("Failed to recover cells and compute KZG proofs: {e:?}"))?;
let num_cells_original_blob = E::number_of_columns() / 2;
let blob_bytes = if data_columns.len() < E::number_of_columns() {
let (recovered_cells, _kzg_proofs) = kzg
.recover_cells_and_compute_kzg_proofs(&cell_ids, &cells)
.map_err(|e| {
format!("Failed to recover cells and compute KZG proofs: {e:?}")
})?;

let num_cells_original_blob = cells.len() / 2;
let blob_bytes = cells
.into_iter()
.take(num_cells_original_blob)
.flat_map(|cell| cell.into_iter())
.collect();
recovered_cells
.into_iter()
.take(num_cells_original_blob)
.flat_map(|cell| cell.into_iter())
.collect()
} else {
cells
.into_iter()
.take(num_cells_original_blob)
.flat_map(|cell| (*cell).into_iter())
.collect()
};

let blob = Blob::<E>::new(blob_bytes).map_err(|e| format!("{e:?}"))?;
let kzg_proof = KzgProof::empty();
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ where
let blobs = if block.message().body().has_blobs() {
debug!("Downloading finalized blobs");
if let Some(response) = remote
.get_blobs::<E>(BlockId::Root(block_root), None, &spec)
.get_blob_sidecars::<E>(BlockId::Root(block_root), None, &spec)
.await
.map_err(|e| format!("Error fetching finalized blobs from remote: {e:?}"))?
{
Expand Down
71 changes: 67 additions & 4 deletions beacon_node/http_api/src/block_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use crate::version::inconsistent_fork_rejection;
use crate::{ExecutionOptimistic, state_id::checkpoint_slot_and_execution_optimistic};
use beacon_chain::kzg_utils::reconstruct_blobs;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped};
use eth2::types::BlobIndicesQuery;
use eth2::types::BlockId as CoreBlockId;
use eth2::types::DataColumnIndicesQuery;
use eth2::types::{BlobIndicesQuery, BlobWrapper, BlobsVersionedHashesQuery};
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use types::{
BlobSidecarList, DataColumnSidecarList, EthSpec, FixedBytesExtended, ForkName, Hash256,
SignedBeaconBlock, SignedBlindedBeaconBlock, Slot,
SignedBeaconBlock, SignedBlindedBeaconBlock, Slot, UnversionedResponse,
beacon_response::ExecutionOptimisticFinalizedMetadata,
};
use warp::Rejection;

Expand Down Expand Up @@ -352,6 +353,68 @@ impl BlockId {
Ok((block, blob_sidecar_list, execution_optimistic, finalized))
}

#[allow(clippy::type_complexity)]
pub fn get_blobs_by_versioned_hashes<T: BeaconChainTypes>(
&self,
query: BlobsVersionedHashesQuery,
chain: &BeaconChain<T>,
) -> Result<
UnversionedResponse<Vec<BlobWrapper<T::EthSpec>>, ExecutionOptimisticFinalizedMetadata>,
warp::Rejection,
> {
let (root, execution_optimistic, finalized) = self.root(chain)?;
let block = BlockId::blinded_block_by_root(&root, chain)?.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!("beacon block with root {}", root))
})?;

// Error if the block is pre-Deneb and lacks blobs.
let blob_kzg_commitments = block.message().body().blob_kzg_commitments().map_err(|_| {
warp_utils::reject::custom_bad_request(
"block is pre-Deneb and has no blobs".to_string(),
)
})?;

let blob_indices_opt = query.versioned_hashes.map(|versioned_hashes| {
versioned_hashes
.iter()
.flat_map(|versioned_hash| {
blob_kzg_commitments.iter().position(|commitment| {
let computed_hash = commitment.calculate_versioned_hash();
computed_hash == *versioned_hash
})
})
.map(|index| index as u64)
.collect::<Vec<_>>()
});

let max_blobs_per_block = chain.spec.max_blobs_per_block(block.epoch()) as usize;
let blob_sidecar_list = if !blob_kzg_commitments.is_empty() {
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
Self::get_blobs_from_data_columns(chain, root, blob_indices_opt, &block)?
} else {
Self::get_blobs(chain, root, blob_indices_opt, max_blobs_per_block)?
}
} else {
BlobSidecarList::new(vec![], max_blobs_per_block)
.map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))?
};

let blobs = blob_sidecar_list
.into_iter()
.map(|sidecar| BlobWrapper::<T::EthSpec> {
blob: sidecar.blob.clone(),
})
.collect();

Ok(UnversionedResponse {
metadata: ExecutionOptimisticFinalizedMetadata {
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
},
data: blobs,
})
}

fn get_blobs<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
root: Hash256,
Expand All @@ -369,9 +432,9 @@ impl BlockId {

let blob_sidecar_list_filtered = match indices {
Some(vec) => {
let list: Vec<_> = blob_sidecar_list
let list: Vec<_> = vec
.into_iter()
.filter(|blob_sidecar| vec.contains(&blob_sidecar.index))
.flat_map(|index| blob_sidecar_list.get(index as usize).cloned())
.collect();

BlobSidecarList::new(list, max_blobs_per_block)
Expand Down
50 changes: 49 additions & 1 deletion beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
equals("v1/beacon/blocks")
.or_else(|| starts_with("v2/beacon/blocks"))
.or_else(|| starts_with("v1/beacon/blob_sidecars"))
.or_else(|| starts_with("v1/beacon/blobs"))
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
.or_else(|| starts_with("v1/beacon/blinded_blocks"))
.or_else(|| starts_with("v2/beacon/blinded_blocks"))
Expand Down Expand Up @@ -1897,7 +1898,7 @@ pub fn serve<T: BeaconChainTypes>(
*/

// GET beacon/blob_sidecars/{block_id}
let get_blobs = eth_v1
let get_blob_sidecars = eth_v1
.and(warp::path("beacon"))
.and(warp::path("blob_sidecars"))
.and(block_id_or_err)
Expand Down Expand Up @@ -1947,6 +1948,52 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// GET beacon/blobs/{block_id}
let get_blobs = eth_v1
.and(warp::path("beacon"))
.and(warp::path("blobs"))
.and(block_id_or_err)
.and(warp::path::end())
.and(multi_key_query::<api_types::BlobsVersionedHashesQuery>())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(warp::header::optional::<api_types::Accept>("accept"))
.then(
|block_id: BlockId,
version_hashes_res: Result<api_types::BlobsVersionedHashesQuery, warp::Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let versioned_hashes = version_hashes_res?;
let response =
block_id.get_blobs_by_versioned_hashes(versioned_hashes, &chain)?;

match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.body(response.data.as_ssz_bytes().into())
.map(|res: Response<Body>| add_ssz_content_type_header(res))
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => {
let res = execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::No,
response.metadata.execution_optimistic.unwrap_or(false),
response.metadata.finalized.unwrap_or(false),
response.data,
)?;
Ok(warp::reply::json(&res).into_response())
}
}
})
},
);

/*
* beacon/pool
*/
Expand Down Expand Up @@ -4794,6 +4841,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_beacon_block_attestations)
.uor(get_beacon_blinded_block)
.uor(get_beacon_block_root)
.uor(get_blob_sidecars)
.uor(get_blobs)
.uor(get_beacon_pool_attestations)
.uor(get_beacon_pool_attester_slashings)
Expand Down
Loading
Loading