Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 77 additions & 12 deletions crates/optimism/rpc/src/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use alloy_eips::BlockId;
use alloy_json_rpc::{RpcRecv, RpcSend};
use alloy_primitives::{BlockNumber, B256};
use alloy_rpc_client::RpcClient;
use jsonrpsee::BatchResponseBuilder;
use jsonrpsee_core::{
middleware::{Batch, Notification, RpcServiceT},
middleware::{Batch, BatchEntry, Notification, RpcServiceT},
server::MethodResponse,
};
use jsonrpsee_types::{Params, Request};
Expand Down Expand Up @@ -122,8 +123,14 @@ impl<S, P> HistoricalRpcService<S, P> {

impl<S, P> RpcServiceT for HistoricalRpcService<S, P>
where
S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,

S: RpcServiceT<
MethodResponse = MethodResponse,
BatchResponse = MethodResponse,
NotificationResponse = MethodResponse,
> + Send
+ Sync
+ Clone
+ 'static,
P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone + 'static,
{
type MethodResponse = S::MethodResponse;
Expand All @@ -145,8 +152,64 @@ where
})
}

fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.inner.batch(req)
fn batch<'a>(
&self,
mut req: Batch<'a>,
) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
let this = self.clone();
let historical = self.historical.clone();

async move {
let mut needs_forwarding = false;
for entry in req.iter_mut() {
if let Ok(BatchEntry::Call(call)) = entry &&
historical.should_forward_request(call)
{
needs_forwarding = true;
break;
}
}

if !needs_forwarding {
// no call needs to be forwarded and we can simply perform this batch request
return this.inner.batch(req).await;
}

// the entire response is checked above so we can assume that these don't exceed
let mut batch_rp = BatchResponseBuilder::new_with_limit(usize::MAX);
let mut got_notification = false;

for batch_entry in req {
match batch_entry {
Ok(BatchEntry::Call(req)) => {
let rp = this.call(req).await;
if let Err(err) = batch_rp.append(rp) {
return err;
}
}
Ok(BatchEntry::Notification(n)) => {
got_notification = true;
this.notification(n).await;
}
Err(err) => {
let (err, id) = err.into_parts();
let rp = MethodResponse::error(id, err);
if let Err(err) = batch_rp.append(rp) {
return err;
}
}
}
}

// If the batch is empty and we got a notification, we return an empty response.
if batch_rp.is_empty() && got_notification {
MethodResponse::notification()
}
// An empty batch is regarded as an invalid request here.
else {
MethodResponse::from_batch(batch_rp.finish())
}
}
}

fn notification<'a>(
Expand All @@ -171,21 +234,23 @@ impl<P> HistoricalRpcInner<P>
where
P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone,
{
/// Checks if a request should be forwarded to the historical endpoint and returns
/// the response if it was forwarded.
async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
let should_forward = match req.method_name() {
/// Checks if a request should be forwarded to the historical endpoint (synchronous check).
fn should_forward_request(&self, req: &Request<'_>) -> bool {
match req.method_name() {
"debug_traceTransaction" |
"eth_getTransactionByHash" |
"eth_getTransactionReceipt" |
"eth_getRawTransactionByHash" => self.should_forward_transaction(req),
method => self.should_forward_block_request(method, req),
};
}
}
Comment thread
reject-i marked this conversation as resolved.

if should_forward {
/// Checks if a request should be forwarded to the historical endpoint and returns
/// the response if it was forwarded.
async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
if self.should_forward_request(req) {
return self.forward_to_historical(req).await
}

None
}

Expand Down