Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ where
version: version_metadata().cargo_pkg_version.to_string(),
commit: version_metadata().vergen_git_sha.to_string(),
};

Ok(EngineApi::new(
ctx.node.provider().clone(),
ctx.config.chain.clone(),
Expand All @@ -1392,6 +1393,7 @@ where
EngineCapabilities::default(),
engine_validator,
ctx.config.engine.accept_execution_requests_hash,
ctx.node.network().clone(),
))
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/optimism/node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ where
EngineCapabilities::new(OP_ENGINE_CAPABILITIES.iter().copied()),
engine_validator,
ctx.config.engine.accept_execution_requests_hash,
ctx.node.network().clone(),
);

Ok(OpEngineApi::new(inner))
Expand Down
12 changes: 12 additions & 0 deletions crates/rpc/rpc-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ pub trait EngineApi<Engine: EngineTypes> {
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<BlobAndProofV2>>>;

/// Fetch blobs for the consensus layer from the blob store.
///
/// Returns a response of the same length as the request. Missing or older-version blobs are
/// returned as `null` elements.
///
/// Returns `null` if syncing.
#[method(name = "getBlobsV3")]
async fn get_blobs_v3(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>>;
}

/// A subset of the ETH rpc interface: <https://ethereum.github.io/execution-apis/api-documentation>
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
EngineCapabilities::default(),
EthereumEngineValidator::new(MAINNET.clone()),
false,
NoopNetwork::default(),
);
let module = AuthRpcModule::new(engine_api);
module.start_server(config).await.unwrap()
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ reth-tasks.workspace = true
reth-engine-primitives.workspace = true
reth-transaction-pool.workspace = true
reth-primitives-traits.workspace = true
reth-network-api.workspace = true

# ethereum
alloy-eips.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/src/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub const CAPABILITIES: &[&str] = &[
"engine_getPayloadBodiesByRangeV1",
"engine_getBlobsV1",
"engine_getBlobsV2",
"engine_getBlobsV3",
];

// The list of all supported Engine capabilities available over the engine endpoint.
Expand Down
140 changes: 139 additions & 1 deletion crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use async_trait::async_trait;
use jsonrpsee_core::{server::RpcModule, RpcResult};
use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
use reth_network_api::NetworkInfo;
use reth_payload_builder::PayloadStore;
use reth_payload_primitives::{
validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
Expand Down Expand Up @@ -94,7 +95,9 @@ where
capabilities: EngineCapabilities,
validator: Validator,
accept_execution_requests_hash: bool,
network: impl NetworkInfo + 'static,
) -> Self {
let is_syncing = Arc::new(move || network.is_syncing());
let inner = Arc::new(EngineApiInner {
provider,
chain_spec,
Expand All @@ -107,6 +110,7 @@ where
tx_pool,
validator,
accept_execution_requests_hash,
is_syncing,
});
Self { inner }
}
Expand Down Expand Up @@ -792,6 +796,35 @@ where
.map_err(|err| EngineApiError::Internal(Box::new(err)))
}

fn get_blobs_v3(
&self,
versioned_hashes: Vec<B256>,
) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
// Check if Osaka fork is active
let current_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
return Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
));
}

if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
}

// Spec requires returning `null` if syncing.
if (*self.inner.is_syncing)() {
return Ok(None)
}
Comment on lines +816 to +819
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, now this makes sense


self.inner
.tx_pool
.get_blobs_for_versioned_hashes_v3(&versioned_hashes)
.map(Some)
.map_err(|err| EngineApiError::Internal(Box::new(err)))
}

/// Metered version of `get_blobs_v2`.
pub fn get_blobs_v2_metered(
&self,
Expand Down Expand Up @@ -827,6 +860,27 @@ where

res
}

/// Metered version of `get_blobs_v3`.
pub fn get_blobs_v3_metered(
&self,
versioned_hashes: Vec<B256>,
) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
let hashes_len = versioned_hashes.len();
let start = Instant::now();
let res = Self::get_blobs_v3(self, versioned_hashes);
self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());

if let Ok(Some(blobs)) = &res {
let blobs_found = blobs.iter().flatten().count();
let blobs_missed = hashes_len - blobs_found;

self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
}

res
}
}

// This is the concrete ethereum engine API implementation.
Expand Down Expand Up @@ -1099,6 +1153,14 @@ where
trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
Ok(self.get_blobs_v2_metered(versioned_hashes)?)
}

async fn get_blobs_v3(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
Ok(self.get_blobs_v3_metered(versioned_hashes)?)
}
}

impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
Expand Down Expand Up @@ -1155,17 +1217,22 @@ struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSp
/// Engine validator.
validator: Validator,
accept_execution_requests_hash: bool,
/// Returns `true` if the node is currently syncing.
is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
use assert_matches::assert_matches;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_engine_primitives::BeaconEngineMessage;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_ethereum_primitives::Block;
use reth_network_api::{
noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
};
use reth_node_ethereum::EthereumEngineValidator;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_provider::test_utils::MockEthProvider;
Expand Down Expand Up @@ -1206,6 +1273,7 @@ mod tests {
EngineCapabilities::default(),
EthereumEngineValidator::new(chain_spec.clone()),
false,
NoopNetwork::default(),
);
let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
(handle, api)
Expand Down Expand Up @@ -1247,6 +1315,76 @@ mod tests {
assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
}

#[derive(Clone)]
struct TestNetworkInfo {
syncing: bool,
}

impl NetworkInfo for TestNetworkInfo {
fn local_addr(&self) -> std::net::SocketAddr {
(std::net::Ipv4Addr::UNSPECIFIED, 0).into()
}

async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
#[allow(deprecated)]
Ok(NetworkStatus {
client_version: "test".to_string(),
protocol_version: 5,
eth_protocol_info: EthProtocolInfo {
network: 1,
difficulty: None,
genesis: Default::default(),
config: Default::default(),
head: Default::default(),
},
capabilities: vec![],
})
}

fn chain_id(&self) -> u64 {
1
}

fn is_syncing(&self) -> bool {
self.syncing
}

fn is_initially_syncing(&self) -> bool {
self.syncing
}
}

#[tokio::test]
async fn get_blobs_v3_returns_null_when_syncing() {
let chain_spec: Arc<ChainSpec> =
Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service::<EthEngineTypes>();
let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();

let api = EngineApi::new(
provider,
chain_spec.clone(),
ConsensusEngineHandle::new(to_engine),
payload_store.into(),
NoopTransactionPool::default(),
Box::<TokioTaskExecutor>::default(),
ClientVersionV1 {
code: ClientCode::RH,
name: "Reth".to_string(),
version: "v0.0.0-test".to_string(),
commit: "test".to_string(),
},
EngineCapabilities::default(),
EthereumEngineValidator::new(chain_spec),
false,
TestNetworkInfo { syncing: true },
);

let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
assert_matches!(res, Ok(None));
}

// tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
mod get_payload_bodies {
use super::*;
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/rpc-engine-api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub(crate) struct EngineApiLatencyMetrics {
pub(crate) get_blobs_v1: Histogram,
/// Latency for `engine_getBlobsV2`
pub(crate) get_blobs_v2: Histogram,
/// Latency for `engine_getBlobsV3`
pub(crate) get_blobs_v3: Histogram,
}

#[derive(Metrics)]
Expand Down
Loading
Loading