diff --git a/Cargo.lock b/Cargo.lock index 2a5671e023..0d5a2dc56d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1341,6 +1341,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "155a5a185e42c6b77ac7b88a15143d930a9e9727a5b7b77eed417404ab15c247" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -1395,6 +1405,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-object-pool" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1ac0219111eb7bb7cb76d4cf2cb50c598e7ae549091d3616f9e95442c18486f" +dependencies = [ + "async-lock", + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -3697,6 +3717,30 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "headers" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" +dependencies = [ + "base64", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.5.0" @@ -3849,6 +3893,40 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "httpmock" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "511f510e9b1888d67f10bab4397f8b019d2a9b249a2c10acbce2d705b1b32e26" +dependencies = [ + "assert-json-diff", + "async-object-pool", + "async-trait", + "base64", + "bytes", + "crossbeam-utils", + "form_urlencoded", + "futures-timer", + "futures-util", + "headers", + "http", + "http-body-util", + "hyper", + "hyper-util", + "path-tree", + "regex", + "serde", + "serde_json", + "serde_regex", + "similar", + "stringmetrics", + "tabwriter", + "thiserror 2.0.17", + "tokio", + "tracing", + "url", +] + [[package]] name = "hyper" version = "1.8.1" @@ -5326,6 +5404,7 @@ dependencies = [ "async-trait", "c-kzg", "http-body-util", + "httpmock", "kona-derive", "kona-genesis", "kona-macros", @@ -5336,6 +5415,7 @@ dependencies = [ "op-alloy-network", "reqwest", "serde", + "serde_json", "thiserror 2.0.17", "tokio", "tower 0.5.2", @@ -7322,6 +7402,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "path-tree" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a97453bc21a968f722df730bfe11bd08745cb50d1300b0df2bda131dece136" +dependencies = [ + "smallvec", +] + [[package]] name = "pem" version = "3.0.6" @@ -10426,6 +10515,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_regex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf" +dependencies = [ + "regex", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -10587,6 +10686,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + [[package]] name = "simple_asn1" version = "0.6.3" @@ -10732,6 +10837,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" +[[package]] +name = "stringmetrics" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b3c8667cd96245cbb600b8dec5680a7319edd719c5aa2b5d23c6bff94f39765" + [[package]] name = "strsim" version = "0.11.1" @@ -10900,6 +11011,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "tabwriter" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fce91f2f0ec87dff7e6bcbbeb267439aa1188703003c6055193c821487400432" +dependencies = [ + "unicode-width", +] + [[package]] name = "tagptr" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index d5832cb4a8..1e5faa394f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -253,6 +253,7 @@ arbtest = "0.3.2" proptest = "1.9.0" criterion = "0.5.1" mockall = "0.14.0" +httpmock = "0.8.2" # Serialization rkyv = "0.8.12" diff --git a/crates/providers/providers-alloy/Cargo.toml b/crates/providers/providers-alloy/Cargo.toml index 48139b18bd..734dd25508 100644 --- a/crates/providers/providers-alloy/Cargo.toml +++ b/crates/providers/providers-alloy/Cargo.toml @@ -58,3 +58,5 @@ metrics = [ "dep:metrics", "kona-derive/metrics" ] [dev-dependencies] tokio.workspace = true +httpmock.workspace = true +serde_json.workspace = true diff --git a/crates/providers/providers-alloy/src/beacon_client.rs b/crates/providers/providers-alloy/src/beacon_client.rs index 0708a22275..cacaf9d511 100644 --- a/crates/providers/providers-alloy/src/beacon_client.rs +++ b/crates/providers/providers-alloy/src/beacon_client.rs @@ -3,11 +3,14 @@ #[cfg(feature = "metrics")] use crate::Metrics; use crate::blobs::BoxedBlobWithIndex; -use alloy_eips::eip4844::IndexedBlobHash; -use alloy_rpc_types_beacon::sidecar::{BeaconBlobBundle, GetBlobsResponse}; +use alloy_eips::eip4844::{IndexedBlobHash, env_settings::EnvKzgSettings, kzg_to_versioned_hash}; +use alloy_primitives::{B256, FixedBytes}; +use alloy_rpc_types_beacon::sidecar::GetBlobsResponse; use async_trait::async_trait; +use c_kzg::Blob; use reqwest::Client; -use std::{boxed::Box, format, string::String, vec::Vec}; +use std::{boxed::Box, collections::HashMap, format, string::String, vec::Vec}; +use thiserror::Error; /// The config spec engine api method. const SPEC_METHOD: &str = "eth/v1/config/spec"; @@ -15,10 +18,7 @@ const SPEC_METHOD: &str = "eth/v1/config/spec"; /// The beacon genesis engine api method. const GENESIS_METHOD: &str = "eth/v1/beacon/genesis"; -/// The blob sidecars engine api method prefix. -const SIDECARS_METHOD_PREFIX_DEPRECATED: &str = "eth/v1/beacon/blob_sidecars"; - -/// THe blobs engine api method prefix. +/// The blobs engine api method prefix. const BLOBS_METHOD_PREFIX: &str = "eth/v1/beacon/blobs"; /// A reduced genesis data. @@ -88,6 +88,32 @@ pub trait BeaconClient { ) -> Result, Self::Error>; } +const BLOB_SIZE: usize = 131072; + +/// [`blob_versioned_hash`] computes the versioned hash of a blob. +fn blob_versioned_hash(blob: &FixedBytes) -> Result { + let kzg_settings = EnvKzgSettings::Default; + let kzg_blob = Blob::new(blob.0); + let commitment = kzg_settings.get().blob_to_kzg_commitment(&kzg_blob)?; + Ok(kzg_to_versioned_hash(commitment.as_slice())) +} + +/// An error that can occur when interacting with the beacon client. +#[derive(Error, Debug)] +pub enum BeaconClientError { + /// HTTP request failed. + #[error("HTTP request failed: {0}")] + Http(#[from] reqwest::Error), + + /// Blob hash not found in beacon response. + #[error("Blob hash not found in beacon response: {0}")] + BlobNotFound(String), + + /// KZG error. + #[error("KZG error: {0}")] + KZG(#[from] c_kzg::Error), +} + /// An online implementation of the [BeaconClient] trait. #[derive(Debug, Clone)] pub struct OnlineBeaconClient { @@ -121,59 +147,52 @@ impl OnlineBeaconClient { self } + /// Fetches only the blobs corresponding to the provided (versioned) blob hashes + /// from the beacon [`BLOBS_METHOD_PREFIX`] endpoint. + /// Blobs are validated against the supplied versioned hashes + /// and returned in the same order as the input. async fn filtered_beacon_blobs( &self, slot: u64, blob_hashes: &[IndexedBlobHash], - ) -> Result, reqwest::Error> { - let blob_indexes = blob_hashes.iter().map(|blob| blob.index).collect::>(); - - Ok( - match self - .inner - .get(format!("{}/{}/{}", self.base, BLOBS_METHOD_PREFIX, slot)) - .send() - .await - { - Ok(response) if response.status().is_success() => { - let bundle = response.json::().await?; - - bundle - .data - .into_iter() - .enumerate() - .filter_map(|(index, blob)| { - let index = index as u64; - blob_indexes - .contains(&index) - .then_some(BoxedBlobWithIndex { index, blob: Box::new(blob) }) - }) - .collect::>() - } - // If the blobs endpoint fails, try the deprecated sidecars endpoint. CL Clients - // only support the blobs endpoint from Fusaka (Fulu) onwards. - _ => self - .inner - .get(format!("{}/{}/{}", self.base, SIDECARS_METHOD_PREFIX_DEPRECATED, slot)) - .send() - .await? - .json::() - .await? - .into_iter() - .filter_map(|blob| { - blob_indexes - .contains(&blob.index) - .then_some(BoxedBlobWithIndex { index: blob.index, blob: blob.blob }) - }) - .collect::>(), - }, - ) + ) -> Result, BeaconClientError> { + let params = blob_hashes.iter().map(|blob| blob.hash.to_string()).collect::>(); + let response = self + .inner + .get(format!("{}/{}/{}", self.base, BLOBS_METHOD_PREFIX, slot)) + .query(&[("versioned_hashes", ¶ms.join(","))]) + .send() + .await? + .error_for_status()?; + let bundle = response.json::().await?; + + let returned_blobs_mapped_by_hash = bundle + .data + .iter() + .map(|data| -> Result<_, BeaconClientError> { + let recomputed_hash = blob_versioned_hash(data)?; + Ok((recomputed_hash, data)) + }) + .collect::, BeaconClientError>>()?; + + // Map the input (blob_hashes) into the output, + // finding the blob from the response + // whose hash matches the input: + blob_hashes + .iter() + .map(|blob_hash| -> Result { + let matching_data = returned_blobs_mapped_by_hash + .get(&blob_hash.hash) + .ok_or(BeaconClientError::BlobNotFound(blob_hash.hash.to_string()))?; + Ok(BoxedBlobWithIndex { blob: Box::new(**matching_data), index: blob_hash.index }) + }) + .collect::, BeaconClientError>>() } } #[async_trait] impl BeaconClient for OnlineBeaconClient { - type Error = reqwest::Error; + type Error = BeaconClientError; async fn slot_interval(&self) -> Result { kona_macros::inc!(gauge, Metrics::BEACON_CLIENT_REQUESTS, "method" => "spec"); @@ -193,7 +212,7 @@ impl BeaconClient for OnlineBeaconClient { kona_macros::inc!(gauge, Metrics::BEACON_CLIENT_ERRORS, "method" => "spec"); } - result + Ok(result?) } async fn genesis_time(&self) -> Result { @@ -209,14 +228,14 @@ impl BeaconClient for OnlineBeaconClient { kona_macros::inc!(gauge, Metrics::BEACON_CLIENT_ERRORS, "method" => "genesis"); } - result + Ok(result?) } async fn filtered_beacon_blobs( &self, slot: u64, blob_hashes: &[IndexedBlobHash], - ) -> Result, Self::Error> { + ) -> Result, BeaconClientError> { kona_macros::inc!(gauge, Metrics::BEACON_CLIENT_REQUESTS, "method" => "blobs"); // Try to get the blobs from the blobs endpoint. @@ -229,3 +248,90 @@ impl BeaconClient for OnlineBeaconClient { result } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy_consensus::Blob; + use alloy_primitives::{FixedBytes, hex::FromHex}; + use httpmock::prelude::*; + use serde_json::json; + + const TEST_BLOB_DATA: Blob = FixedBytes::repeat_byte(1); + const TEST_BLOB_HASH_HEX: &str = + "0x016c357b8b3a6b3fd82386e7bebf77143d537cdb1c856509661c412602306a04"; + + #[test] + fn test_blob_versioned_hash() { + let input: Blob = FixedBytes::repeat_byte(1); + let test_blob_hash: FixedBytes<32> = FixedBytes::from_hex(TEST_BLOB_HASH_HEX).unwrap(); + assert_eq!(test_blob_hash, blob_versioned_hash(&input).unwrap()); + } + + #[tokio::test] + async fn test_filtered_beacon_blobs() { + let slot = 987654321; + let slot_string = slot.to_string(); + let repeated_blob_data: Vec = vec![TEST_BLOB_DATA, TEST_BLOB_DATA]; + let garbage_blob_data: Vec = vec![FixedBytes::repeat_byte(2)]; + let required_query_param = format!("{TEST_BLOB_HASH_HEX},{TEST_BLOB_HASH_HEX}"); + let test_blob_hash: FixedBytes<32> = FixedBytes::from_hex(TEST_BLOB_HASH_HEX).unwrap(); + let requested_blob_hashes: Vec = vec![ + IndexedBlobHash { index: 0, hash: test_blob_hash }, + IndexedBlobHash { index: 2, hash: test_blob_hash }, + ]; + + struct TestCase { + name: &'static str, + mock_response_data: Vec, + want: Option>, // if none, expect an error + } + + let test_cases = vec![ + TestCase { + name: "Repeated Blob Data, expect success", + mock_response_data: repeated_blob_data, + want: Some(vec![ + BoxedBlobWithIndex { index: 0, blob: Box::new(TEST_BLOB_DATA) }, + BoxedBlobWithIndex { index: 2, blob: Box::new(TEST_BLOB_DATA) }, + ]), + }, + TestCase { + name: "Garbage Blob Data, expect error", + mock_response_data: garbage_blob_data, + want: None, // indicates an error is expected + }, + ]; + + let server = MockServer::start(); + for test_case in test_cases { + // This server mocks a single, specific query on a beacon node, + let mock_response = json!({ + "execution_optimistic": false, + "finalized": false, + "data": test_case.mock_response_data + }); + let mut blobs_mock = server.mock(|when, then| { + when.method(GET) + .path(format!("/eth/v1/beacon/blobs/{slot_string}")) + .query_param("versioned_hashes", required_query_param.clone()); + then.status(200).json_body(mock_response); + }); + + let client = OnlineBeaconClient::new_http(server.base_url()); + let response = client.filtered_beacon_blobs(slot, &requested_blob_hashes).await; + blobs_mock.assert(); + match test_case.want { + Some(s) => { + let r = response.unwrap(); + assert_eq!(r.len(), s.len(), "length mismatch{}", test_case.name); + assert_eq!(r, s, "{}", test_case.name) + } + None => { + assert!(response.is_err(), "{}", test_case.name) + } + } + blobs_mock.delete(); + } + } +} diff --git a/crates/providers/providers-alloy/src/blobs.rs b/crates/providers/providers-alloy/src/blobs.rs index 0e1b455cb8..022a22ac0d 100644 --- a/crates/providers/providers-alloy/src/blobs.rs +++ b/crates/providers/providers-alloy/src/blobs.rs @@ -75,7 +75,7 @@ impl OnlineBlobProvider { slot: u64, blob_hashes: &[IndexedBlobHash], ) -> Result, BlobProviderError> { - kona_macros::inc!(gauge, Metrics::BLOB_SIDECAR_FETCHES); + kona_macros::inc!(gauge, Metrics::BLOB_FETCHES); let result = self .beacon_client @@ -85,7 +85,7 @@ impl OnlineBlobProvider { #[cfg(feature = "metrics")] if result.is_err() { - kona_macros::inc!(gauge, Metrics::BLOB_SIDECAR_FETCH_ERRORS); + kona_macros::inc!(gauge, Metrics::BLOB_FETCH_ERRORS); } result diff --git a/crates/providers/providers-alloy/src/lib.rs b/crates/providers/providers-alloy/src/lib.rs index 16bab6af91..c3559a6dfe 100644 --- a/crates/providers/providers-alloy/src/lib.rs +++ b/crates/providers/providers-alloy/src/lib.rs @@ -7,6 +7,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod metrics; +pub use beacon_client::BeaconClientError; pub use metrics::Metrics; mod beacon_client; diff --git a/crates/providers/providers-alloy/src/metrics.rs b/crates/providers/providers-alloy/src/metrics.rs index 17c8ac6752..994d460c14 100644 --- a/crates/providers/providers-alloy/src/metrics.rs +++ b/crates/providers/providers-alloy/src/metrics.rs @@ -29,11 +29,11 @@ impl Metrics { /// Identifier for the gauge that tracks L2 chain provider errors. pub const L2_CHAIN_PROVIDER_ERRORS: &str = "kona_providers_l2_chain_errors"; - /// Identifier for the gauge that tracks blob sidecar fetches. - pub const BLOB_SIDECAR_FETCHES: &str = "kona_providers_blob_sidecar_fetches"; + /// Identifier for the gauge that tracks blob fetches. + pub const BLOB_FETCHES: &str = "kona_providers_blob_fetches"; - /// Identifier for the gauge that tracks blob sidecar fetch errors. - pub const BLOB_SIDECAR_FETCH_ERRORS: &str = "kona_providers_blob_sidecar_errors"; + /// Identifier for the gauge that tracks blob fetch errors. + pub const BLOB_FETCH_ERRORS: &str = "kona_providers_blob_fetch_errors"; /// Identifier for the histogram that tracks provider request duration. pub const PROVIDER_REQUEST_DURATION: &str = "kona_providers_request_duration"; @@ -90,11 +90,8 @@ impl Metrics { Self::L2_CHAIN_PROVIDER_ERRORS, "Number of errors in L2 chain provider requests" ); - metrics::describe_gauge!(Self::BLOB_SIDECAR_FETCHES, "Number of blob sidecar fetches"); - metrics::describe_gauge!( - Self::BLOB_SIDECAR_FETCH_ERRORS, - "Number of blob sidecar fetch errors" - ); + metrics::describe_gauge!(Self::BLOB_FETCHES, "Number of blob sidecar fetches"); + metrics::describe_gauge!(Self::BLOB_FETCH_ERRORS, "Number of blob sidecar fetch errors"); metrics::describe_histogram!( Self::PROVIDER_REQUEST_DURATION, "Duration of provider requests in seconds" @@ -195,8 +192,8 @@ impl Metrics { ); // Blob sidecar metrics - kona_macros::set!(gauge, Self::BLOB_SIDECAR_FETCHES, 0); - kona_macros::set!(gauge, Self::BLOB_SIDECAR_FETCH_ERRORS, 0); + kona_macros::set!(gauge, Self::BLOB_FETCHES, 0); + kona_macros::set!(gauge, Self::BLOB_FETCH_ERRORS, 0); // Cache metrics kona_macros::set!(gauge, Self::CACHE_ENTRIES, "cache", "header_by_hash", 0);