From df7964f2435cfddd3212cd32d46fd937cad05c2e Mon Sep 17 00:00:00 2001 From: yottaes Date: Sun, 12 Apr 2026 03:30:50 +0700 Subject: [PATCH 1/2] feat(rpc): support transactionReceipts subscription in eth_subscribe Adds handler for SubscriptionKind::TransactionReceipts that streams RPC-formatted receipts for each new block. Supports optional transactionHashes filter parameter. Includes temporary alloy patches and amsterdam header field fixes. These will be removed once alloy releases alloy-rs/alloy#3410 and reth bumps the dependency. --- crates/rpc/rpc-builder/tests/it/ws.rs | 8 ++ crates/rpc/rpc/src/eth/pubsub.rs | 111 ++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 8 deletions(-) diff --git a/crates/rpc/rpc-builder/tests/it/ws.rs b/crates/rpc/rpc-builder/tests/it/ws.rs index 6c6af7270a1..9c2685214f7 100644 --- a/crates/rpc/rpc-builder/tests/it/ws.rs +++ b/crates/rpc/rpc-builder/tests/it/ws.rs @@ -37,6 +37,14 @@ async fn test_eth_subscribe_all_supported_kinds_accept() { serde_json::json!({"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]}), ], ), + ("transactionReceipts", vec![]), + ("transactionReceipts", vec![serde_json::json!({"transactionHashes": []})]), + ( + "transactionReceipts", + vec![ + serde_json::json!({"transactionHashes": ["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}), + ], + ), ]; for (kind, params) in cases { diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 77b806407f4..b9420e4310e 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -2,9 +2,12 @@ use std::sync::Arc; +use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt}; use alloy_primitives::TxHash; use alloy_rpc_types_eth::{ - pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata}, + pubsub::{ + Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams, + }, Filter, Log, }; use futures::StreamExt; @@ -13,7 +16,8 @@ use jsonrpsee::{ }; use reth_chain_state::CanonStateSubscriptions; use reth_network_api::NetworkInfo; -use reth_rpc_convert::RpcHeader; +use reth_primitives_traits::TransactionMeta; +use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcHeader}; use reth_rpc_eth_api::{ pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction, }; @@ -125,7 +129,7 @@ where }; std::future::ready(tx_value) }); - return pipe_from_stream(accepted_sink, stream).await + return pipe_from_stream(accepted_sink, stream).await; } Params::Bool(false) | Params::None => { // only hashes requested @@ -158,7 +162,7 @@ where .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { - return Ok(()) + return Ok(()); } while canon_state.next().await.is_some() { @@ -178,17 +182,108 @@ where .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { - break + break; } } } Ok(()) } - _ => { - // TODO: implement once https://github.com/alloy-rs/alloy/pull/3410 is released - Err(invalid_params_rpc_err("Unsupported subscription kind")) + SubscriptionKind::TransactionReceipts => { + let filter = match params { + Some(Params::TransactionReceipts(filter)) => filter, + None | Some(Params::None) => TransactionReceiptsParams::default(), + _ => { + return Err(invalid_params_rpc_err("Invalid params for transactionReceipts")) + } + }; + + let converter = self.inner.eth_api.converter(); + let stream = self.inner.eth_api.provider().canonical_state_stream().flat_map( + move |new_chain| { + // for each block in the new chain, build RPC receipts + let results: Vec<_> = new_chain + .committed() + .blocks_and_receipts() + .filter_map(|(block, receipts)| { + let block_hash = block.hash(); + let block_number = block.number(); + let base_fee = block.base_fee_per_gas(); + let excess_blob_gas = block.excess_blob_gas(); + let timestamp = block.timestamp(); + + let mut gas_used: u64 = 0; + let mut next_log_index: usize = 0; + + // build ConvertReceiptInput for each tx+receipt pair + // (same logic as eth_getBlockReceipts HTTP endpoint) + let inputs: Vec<_> = block + .transactions_recovered() + .zip(receipts.iter().cloned()) + .enumerate() + .filter(|(_, (tx, _))| { + // apply transaction hash filter if provided + match &filter.transaction_hashes { + Some(hashes) if !hashes.is_empty() => { + hashes.contains(tx.tx_hash()) + } + _ => true, + } + }) + .map(|(idx, (tx, receipt))| { + let meta = TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }; + + let cumulative_gas_used = receipt.cumulative_gas_used(); + let logs_len = receipt.logs().len(); + + let input = ConvertReceiptInput { + tx, + gas_used: cumulative_gas_used - gas_used, + next_log_index, + meta, + receipt, + }; + + gas_used = cumulative_gas_used; + next_log_index += logs_len; + + input + }) + .collect(); + + if inputs.is_empty() { + return None; + } + + match converter.convert_receipts(inputs) { + Ok(rpc_receipts) => Some(rpc_receipts), + Err(err) => { + error!( + target = "rpc", + %err, + "Failed to convert receipts" + ); + None + } + } + }) + .collect(); + + futures::stream::iter(results) + }, + ); + + pipe_from_stream(accepted_sink, stream).await } + _ => Err(invalid_params_rpc_err("Unsupported subscription kind")), } } } From ab44f325d84e0a0d8fe5a954bbe2d0135fbd8427 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 14 Apr 2026 10:21:44 +0200 Subject: [PATCH 2/2] fix(rpc): preserve filtered receipt offsets --- crates/rpc/rpc/src/eth/pubsub.rs | 58 +++++++++++++++----------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index b9420e4310e..19407f34099 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -129,7 +129,7 @@ where }; std::future::ready(tx_value) }); - return pipe_from_stream(accepted_sink, stream).await; + return pipe_from_stream(accepted_sink, stream).await } Params::Bool(false) | Params::None => { // only hashes requested @@ -162,7 +162,7 @@ where .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { - return Ok(()); + return Ok(()) } while canon_state.next().await.is_some() { @@ -182,7 +182,7 @@ where .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { - break; + break } } } @@ -219,43 +219,39 @@ where // (same logic as eth_getBlockReceipts HTTP endpoint) let inputs: Vec<_> = block .transactions_recovered() - .zip(receipts.iter().cloned()) + .zip(receipts.iter()) .enumerate() - .filter(|(_, (tx, _))| { + .filter_map(|(idx, (tx, receipt))| { + let gas_used_before = gas_used; + let next_log_index_before = next_log_index; + let cumulative_gas_used = receipt.cumulative_gas_used(); + + gas_used = cumulative_gas_used; + next_log_index += receipt.logs().len(); + // apply transaction hash filter if provided - match &filter.transaction_hashes { + let matches = match &filter.transaction_hashes { Some(hashes) if !hashes.is_empty() => { hashes.contains(tx.tx_hash()) } _ => true, - } - }) - .map(|(idx, (tx, receipt))| { - let meta = TransactionMeta { - tx_hash: *tx.tx_hash(), - index: idx as u64, - block_hash, - block_number, - base_fee, - excess_blob_gas, - timestamp, }; - let cumulative_gas_used = receipt.cumulative_gas_used(); - let logs_len = receipt.logs().len(); - - let input = ConvertReceiptInput { + matches.then(|| ConvertReceiptInput { tx, - gas_used: cumulative_gas_used - gas_used, - next_log_index, - meta, - receipt, - }; - - gas_used = cumulative_gas_used; - next_log_index += logs_len; - - input + gas_used: cumulative_gas_used - gas_used_before, + next_log_index: next_log_index_before, + meta: TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }, + receipt: receipt.clone(), + }) }) .collect();