diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index ad669e17291..382775ab50f 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -299,6 +299,8 @@ pub(crate) fn build_data_column_sidecars( /// /// If `blob_indices_opt` is `None`, this function attempts to reconstruct all blobs associated /// with the block. +/// This function does NOT use rayon as this is primarily used by a non critical path in HTTP API +/// and it will be slow if the node needs to reconstruct the blobs pub fn reconstruct_blobs( kzg: &Kzg, data_columns: &[Arc>], @@ -320,7 +322,7 @@ pub fn reconstruct_blobs( }; let blob_sidecars = blob_indices - .into_par_iter() + .into_iter() .map(|row_index| { let mut cells: Vec = vec![]; let mut cell_ids: Vec = vec![]; @@ -337,16 +339,26 @@ pub fn reconstruct_blobs( cell_ids.push(data_column.index); } - let (cells, _kzg_proofs) = kzg - .recover_cells_and_compute_kzg_proofs(&cell_ids, &cells) - .map_err(|e| format!("Failed to recover cells and compute KZG proofs: {e:?}"))?; + let num_cells_original_blob = E::number_of_columns() / 2; + let blob_bytes = if data_columns.len() < E::number_of_columns() { + let (recovered_cells, _kzg_proofs) = kzg + .recover_cells_and_compute_kzg_proofs(&cell_ids, &cells) + .map_err(|e| { + format!("Failed to recover cells and compute KZG proofs: {e:?}") + })?; - let num_cells_original_blob = cells.len() / 2; - let blob_bytes = cells - .into_iter() - .take(num_cells_original_blob) - .flat_map(|cell| cell.into_iter()) - .collect(); + recovered_cells + .into_iter() + .take(num_cells_original_blob) + .flat_map(|cell| cell.into_iter()) + .collect() + } else { + cells + .into_iter() + .take(num_cells_original_blob) + .flat_map(|cell| (*cell).into_iter()) + .collect() + }; let blob = Blob::::new(blob_bytes).map_err(|e| format!("{e:?}"))?; let kzg_proof = KzgProof::empty(); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index d984d5fedce..02c042bf282 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -412,7 +412,7 @@ where let blobs = if block.message().body().has_blobs() { debug!("Downloading finalized blobs"); if let Some(response) = remote - .get_blobs::(BlockId::Root(block_root), None, &spec) + .get_blob_sidecars::(BlockId::Root(block_root), None, &spec) .await .map_err(|e| format!("Error fetching finalized blobs from remote: {e:?}"))? { diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index e527e466f67..778067c32bb 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -2,15 +2,16 @@ use crate::version::inconsistent_fork_rejection; use crate::{ExecutionOptimistic, state_id::checkpoint_slot_and_execution_optimistic}; use beacon_chain::kzg_utils::reconstruct_blobs; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use eth2::types::BlobIndicesQuery; use eth2::types::BlockId as CoreBlockId; use eth2::types::DataColumnIndicesQuery; +use eth2::types::{BlobIndicesQuery, BlobWrapper, BlobsVersionedHashesQuery}; use std::fmt; use std::str::FromStr; use std::sync::Arc; use types::{ BlobSidecarList, DataColumnSidecarList, EthSpec, FixedBytesExtended, ForkName, Hash256, - SignedBeaconBlock, SignedBlindedBeaconBlock, Slot, + SignedBeaconBlock, SignedBlindedBeaconBlock, Slot, UnversionedResponse, + beacon_response::ExecutionOptimisticFinalizedMetadata, }; use warp::Rejection; @@ -352,6 +353,68 @@ impl BlockId { Ok((block, blob_sidecar_list, execution_optimistic, finalized)) } + #[allow(clippy::type_complexity)] + pub fn get_blobs_by_versioned_hashes( + &self, + query: BlobsVersionedHashesQuery, + chain: &BeaconChain, + ) -> Result< + UnversionedResponse>, ExecutionOptimisticFinalizedMetadata>, + warp::Rejection, + > { + let (root, execution_optimistic, finalized) = self.root(chain)?; + let block = BlockId::blinded_block_by_root(&root, chain)?.ok_or_else(|| { + warp_utils::reject::custom_not_found(format!("beacon block with root {}", root)) + })?; + + // Error if the block is pre-Deneb and lacks blobs. + let blob_kzg_commitments = block.message().body().blob_kzg_commitments().map_err(|_| { + warp_utils::reject::custom_bad_request( + "block is pre-Deneb and has no blobs".to_string(), + ) + })?; + + let blob_indices_opt = query.versioned_hashes.map(|versioned_hashes| { + versioned_hashes + .iter() + .flat_map(|versioned_hash| { + blob_kzg_commitments.iter().position(|commitment| { + let computed_hash = commitment.calculate_versioned_hash(); + computed_hash == *versioned_hash + }) + }) + .map(|index| index as u64) + .collect::>() + }); + + let max_blobs_per_block = chain.spec.max_blobs_per_block(block.epoch()) as usize; + let blob_sidecar_list = if !blob_kzg_commitments.is_empty() { + if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + Self::get_blobs_from_data_columns(chain, root, blob_indices_opt, &block)? + } else { + Self::get_blobs(chain, root, blob_indices_opt, max_blobs_per_block)? + } + } else { + BlobSidecarList::new(vec![], max_blobs_per_block) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))? + }; + + let blobs = blob_sidecar_list + .into_iter() + .map(|sidecar| BlobWrapper:: { + blob: sidecar.blob.clone(), + }) + .collect(); + + Ok(UnversionedResponse { + metadata: ExecutionOptimisticFinalizedMetadata { + execution_optimistic: Some(execution_optimistic), + finalized: Some(finalized), + }, + data: blobs, + }) + } + fn get_blobs( chain: &BeaconChain, root: Hash256, @@ -369,9 +432,9 @@ impl BlockId { let blob_sidecar_list_filtered = match indices { Some(vec) => { - let list: Vec<_> = blob_sidecar_list + let list: Vec<_> = vec .into_iter() - .filter(|blob_sidecar| vec.contains(&blob_sidecar.index)) + .flat_map(|index| blob_sidecar_list.get(index as usize).cloned()) .collect(); BlobSidecarList::new(list, max_blobs_per_block) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 1b18ed50a3f..7f6c97a0f85 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -214,6 +214,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( */ // GET beacon/blob_sidecars/{block_id} - let get_blobs = eth_v1 + let get_blob_sidecars = eth_v1 .and(warp::path("beacon")) .and(warp::path("blob_sidecars")) .and(block_id_or_err) @@ -1947,6 +1948,52 @@ pub fn serve( }, ); + // GET beacon/blobs/{block_id} + let get_blobs = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blobs")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(multi_key_query::()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(warp::header::optional::("accept")) + .then( + |block_id: BlockId, + version_hashes_res: Result, + task_spawner: TaskSpawner, + chain: Arc>, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let versioned_hashes = version_hashes_res?; + let response = + block_id.get_blobs_by_versioned_hashes(versioned_hashes, &chain)?; + + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .body(response.data.as_ssz_bytes().into()) + .map(|res: Response| add_ssz_content_type_header(res)) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => { + let res = execution_optimistic_finalized_beacon_response( + ResponseIncludesVersion::No, + response.metadata.execution_optimistic.unwrap_or(false), + response.metadata.finalized.unwrap_or(false), + response.data, + )?; + Ok(warp::reply::json(&res).into_response()) + } + } + }) + }, + ); + /* * beacon/pool */ @@ -4794,6 +4841,7 @@ pub fn serve( .uor(get_beacon_block_attestations) .uor(get_beacon_blinded_block) .uor(get_beacon_block_root) + .uor(get_blob_sidecars) .uor(get_blobs) .uor(get_beacon_pool_attestations) .uor(get_beacon_pool_attester_slashings) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 2072fb9932b..9c18a7c1e87 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -90,6 +90,7 @@ struct ApiTester { struct ApiTesterConfig { spec: ChainSpec, retain_historic_states: bool, + import_all_data_columns: bool, } impl Default for ApiTesterConfig { @@ -99,6 +100,7 @@ impl Default for ApiTesterConfig { Self { spec, retain_historic_states: false, + import_all_data_columns: false, } } } @@ -137,6 +139,7 @@ impl ApiTester { .deterministic_withdrawal_keypairs(VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() + .import_all_data_columns(config.import_all_data_columns) .build(); harness @@ -441,10 +444,7 @@ impl ApiTester { } pub async fn new_mev_tester_default_payload_value() -> Self { - let mut config = ApiTesterConfig { - retain_historic_states: false, - spec: E::default_spec(), - }; + let mut config = ApiTesterConfig::default(); config.spec.altair_fork_epoch = Some(Epoch::new(0)); config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); let tester = Self::new_from_config(config) @@ -1858,7 +1858,7 @@ impl ApiTester { }; let result = match self .client - .get_blobs::( + .get_blob_sidecars::( CoreBlockId::Root(block_root), blob_indices.as_deref(), &self.chain.spec, @@ -1879,6 +1879,77 @@ impl ApiTester { self } + pub async fn test_get_blobs(self, versioned_hashes: bool) -> Self { + let block_id = BlockId(CoreBlockId::Finalized); + let (block_root, _, _) = block_id.root(&self.chain).unwrap(); + let (block, _, _) = block_id.full_block(&self.chain).await.unwrap(); + let num_blobs = block.num_expected_blobs(); + + let versioned_hashes: Option> = if versioned_hashes { + Some( + block + .message() + .body() + .blob_kzg_commitments() + .unwrap() + .iter() + .map(|commitment| commitment.calculate_versioned_hash()) + .collect(), + ) + } else { + None + }; + + let result = match self + .client + .get_blobs::(CoreBlockId::Root(block_root), versioned_hashes.as_deref()) + .await + { + Ok(response) => response.unwrap().into_data(), + Err(e) => panic!("query failed incorrectly: {e:?}"), + }; + + assert_eq!( + result.len(), + versioned_hashes.map_or(num_blobs, |versioned_hashes| versioned_hashes.len()) + ); + + self + } + + pub async fn test_get_blobs_post_fulu_full_node(self, versioned_hashes: bool) -> Self { + let block_id = BlockId(CoreBlockId::Finalized); + let (block_root, _, _) = block_id.root(&self.chain).unwrap(); + let (block, _, _) = block_id.full_block(&self.chain).await.unwrap(); + + let versioned_hashes: Option> = if versioned_hashes { + Some( + block + .message() + .body() + .blob_kzg_commitments() + .unwrap() + .iter() + .map(|commitment| commitment.calculate_versioned_hash()) + .collect(), + ) + } else { + None + }; + + match self + .client + .get_blobs::(CoreBlockId::Root(block_root), versioned_hashes.as_deref()) + .await + { + Ok(result) => panic!("Full node are unable to return blobs post-Fulu: {result:?}"), + // Post-Fulu, full nodes don't store blobs and return error 500 + Err(e) => assert_eq!(e.status().unwrap(), 500), + }; + + self + } + /// Test fetching of blob sidecars that are not available in the database due to pruning. /// /// If `zero_blobs` is false, test a block with >0 blobs, which should be unavailable. @@ -1918,7 +1989,7 @@ impl ApiTester { match self .client - .get_blobs::(CoreBlockId::Slot(test_slot), None, &self.chain.spec) + .get_blob_sidecars::(CoreBlockId::Slot(test_slot), None, &self.chain.spec) .await { Ok(result) => { @@ -1956,7 +2027,7 @@ impl ApiTester { match self .client - .get_blobs::(CoreBlockId::Slot(test_slot), None, &self.chain.spec) + .get_blob_sidecars::(CoreBlockId::Slot(test_slot), None, &self.chain.spec) .await { Ok(result) => panic!("queries for pre-Deneb slots should fail. got: {result:?}"), @@ -7704,10 +7775,7 @@ async fn builder_payload_chosen_by_profit_v3() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn builder_works_post_capella() { - let mut config = ApiTesterConfig { - retain_historic_states: false, - spec: E::default_spec(), - }; + let mut config = ApiTesterConfig::default(); config.spec.altair_fork_epoch = Some(Epoch::new(0)); config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); config.spec.capella_fork_epoch = Some(Epoch::new(0)); @@ -7724,10 +7792,7 @@ async fn builder_works_post_capella() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn builder_works_post_deneb() { - let mut config = ApiTesterConfig { - retain_historic_states: false, - spec: E::default_spec(), - }; + let mut config = ApiTesterConfig::default(); config.spec.altair_fork_epoch = Some(Epoch::new(0)); config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); config.spec.capella_fork_epoch = Some(Epoch::new(0)); @@ -7745,22 +7810,66 @@ async fn builder_works_post_deneb() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_blob_sidecars() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.capella_fork_epoch = Some(Epoch::new(0)); + config.spec.deneb_fork_epoch = Some(Epoch::new(0)); + + ApiTester::new_from_config(config) + .await + .test_post_beacon_blocks_valid() + .await + .test_get_blob_sidecars(false) + .await + .test_get_blob_sidecars(true) + .await + .test_get_blobs(false) + .await + .test_get_blobs(true) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_blobs_post_fulu_supernode() { let mut config = ApiTesterConfig { retain_historic_states: false, spec: E::default_spec(), + // For supernode, we import all data columns + import_all_data_columns: true, }; config.spec.altair_fork_epoch = Some(Epoch::new(0)); config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); config.spec.capella_fork_epoch = Some(Epoch::new(0)); config.spec.deneb_fork_epoch = Some(Epoch::new(0)); + config.spec.electra_fork_epoch = Some(Epoch::new(0)); + config.spec.fulu_fork_epoch = Some(Epoch::new(0)); ApiTester::new_from_config(config) .await - .test_post_beacon_blocks_valid() + // We can call the same get_blobs function in this test + // because the function will call get_blobs_by_versioned_hashes which handles peerDAS post-Fulu + .test_get_blobs(false) .await - .test_get_blob_sidecars(false) + .test_get_blobs(true) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_blobs_post_fulu_full_node() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.capella_fork_epoch = Some(Epoch::new(0)); + config.spec.deneb_fork_epoch = Some(Epoch::new(0)); + config.spec.electra_fork_epoch = Some(Epoch::new(0)); + config.spec.fulu_fork_epoch = Some(Epoch::new(0)); + + ApiTester::new_from_config(config) .await - .test_get_blob_sidecars(true) + .test_get_blobs_post_fulu_full_node(false) + .await + .test_get_blobs_post_fulu_full_node(true) .await; } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 3368569d59f..0423794d0d5 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1336,7 +1336,7 @@ impl BeaconNodeHttpClient { } /// Path for `v1/beacon/blob_sidecars/{block_id}` - pub fn get_blobs_path(&self, block_id: BlockId) -> Result { + pub fn get_blob_sidecars_path(&self, block_id: BlockId) -> Result { let mut path = self.eth_path(V1)?; path.path_segments_mut() .map_err(|()| Error::InvalidUrl(self.server.clone()))? @@ -1346,6 +1346,17 @@ impl BeaconNodeHttpClient { Ok(path) } + /// Path for `v1/beacon/blobs/{blob_id}` + pub fn get_blobs_path(&self, block_id: BlockId) -> Result { + let mut path = self.eth_path(V1)?; + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blobs") + .push(&block_id.to_string()); + Ok(path) + } + /// Path for `v1/beacon/blinded_blocks/{block_id}` pub fn get_beacon_blinded_blocks_path(&self, block_id: BlockId) -> Result { let mut path = self.eth_path(V1)?; @@ -1374,13 +1385,13 @@ impl BeaconNodeHttpClient { /// `GET v1/beacon/blob_sidecars/{block_id}` /// /// Returns `Ok(None)` on a 404 error. - pub async fn get_blobs( + pub async fn get_blob_sidecars( &self, block_id: BlockId, indices: Option<&[u64]>, spec: &ChainSpec, ) -> Result>>, Error> { - let mut path = self.get_blobs_path(block_id)?; + let mut path = self.get_blob_sidecars_path(block_id)?; if let Some(indices) = indices { let indices_string = indices .iter() @@ -1400,6 +1411,31 @@ impl BeaconNodeHttpClient { .map(|opt| opt.map(BeaconResponse::ForkVersioned)) } + /// `GET v1/beacon/blobs/{block_id}` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_blobs( + &self, + block_id: BlockId, + versioned_hashes: Option<&[Hash256]>, + ) -> Result>>>, Error> + { + let mut path = self.get_blobs_path(block_id)?; + if let Some(hashes) = versioned_hashes { + let hashes_string = hashes + .iter() + .map(|hash| hash.to_string()) + .collect::>() + .join(","); + path.query_pairs_mut() + .append_pair("versioned_hashes", &hashes_string); + } + + self.get_opt(path) + .await + .map(|opt| opt.map(BeaconResponse::Unversioned)) + } + /// `GET v1/beacon/blinded_blocks/{block_id}` /// /// Returns `Ok(None)` on a 404 error. diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b72ab293801..8f553b57d9c 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -716,6 +716,13 @@ pub struct BlobIndicesQuery { pub indices: Option>, } +#[derive(Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BlobsVersionedHashesQuery { + #[serde(default, deserialize_with = "option_query_vec")] + pub versioned_hashes: Option>, +} + #[derive(Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct DataColumnIndicesQuery { @@ -2317,6 +2324,14 @@ pub struct StandardAttestationRewards { pub total_rewards: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode)] +#[serde(bound = "E: EthSpec")] +#[serde(transparent)] +pub struct BlobWrapper { + #[serde(with = "ssz_types::serde_utils::hex_fixed_vec")] + pub blob: Blob, +} + #[cfg(test)] mod test { use std::fmt::Debug; diff --git a/consensus/types/src/beacon_response.rs b/consensus/types/src/beacon_response.rs index 2e458543649..fc59fc94329 100644 --- a/consensus/types/src/beacon_response.rs +++ b/consensus/types/src/beacon_response.rs @@ -25,6 +25,7 @@ pub struct ForkVersionedResponse { /// `Deserialize`. #[derive(Debug, PartialEq, Clone, Serialize)] pub struct UnversionedResponse { + #[serde(flatten)] pub metadata: M, pub data: T, } @@ -195,9 +196,10 @@ impl From> for BeaconResponse { #[cfg(test)] mod fork_version_response_tests { + use crate::beacon_response::ExecutionOptimisticFinalizedMetadata; use crate::{ ExecutionPayload, ExecutionPayloadBellatrix, ForkName, ForkVersionedResponse, - MainnetEthSpec, + MainnetEthSpec, UnversionedResponse, }; use serde_json::json; @@ -236,4 +238,24 @@ mod fork_version_response_tests { assert!(result.is_err()); } + + // The following test should only pass by having the attribute #[serde(flatten)] on the metadata + #[test] + fn unversioned_response_serialize_dezerialize_round_trip_test() { + // Create an UnversionedResponse with some data + let data = UnversionedResponse { + metadata: ExecutionOptimisticFinalizedMetadata { + execution_optimistic: Some(false), + finalized: Some(false), + }, + data: "some_test_data".to_string(), + }; + + let serialized = serde_json::to_string(&data); + + let deserialized = + serde_json::from_str(&serialized.unwrap()).expect("Failed to deserialize"); + + assert_eq!(data, deserialized); + } } diff --git a/lcli/src/http_sync.rs b/lcli/src/http_sync.rs index 2e36eadf235..6f7dcdb5956 100644 --- a/lcli/src/http_sync.rs +++ b/lcli/src/http_sync.rs @@ -124,7 +124,7 @@ async fn get_block_from_source( .unwrap() .unwrap(); let blobs_from_source = source - .get_blobs::(block_id, None, spec) + .get_blob_sidecars::(block_id, None, spec) .await .unwrap() .unwrap() diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index 1368c495cd8..1240785121a 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -424,7 +424,7 @@ pub async fn verify_full_blob_production_up_to( // the `verify_full_block_production_up_to` function. if block.is_some() { remote_node - .get_blobs::(BlockId::Slot(Slot::new(slot)), None, &E::default_spec()) + .get_blobs::(BlockId::Slot(Slot::new(slot)), None) .await .map_err(|e| format!("Failed to get blobs at slot {slot:?}: {e:?}"))? .ok_or_else(|| format!("No blobs available at slot {slot:?}"))?;