diff --git a/crates/rpc/rpc-builder/tests/it/ws.rs b/crates/rpc/rpc-builder/tests/it/ws.rs index 6c6af7270a1..9c2685214f7 100644 --- a/crates/rpc/rpc-builder/tests/it/ws.rs +++ b/crates/rpc/rpc-builder/tests/it/ws.rs @@ -37,6 +37,14 @@ async fn test_eth_subscribe_all_supported_kinds_accept() { serde_json::json!({"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]}), ], ), + ("transactionReceipts", vec![]), + ("transactionReceipts", vec![serde_json::json!({"transactionHashes": []})]), + ( + "transactionReceipts", + vec![ + serde_json::json!({"transactionHashes": ["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}), + ], + ), ]; for (kind, params) in cases { diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 77b806407f4..19407f34099 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -2,9 +2,12 @@ use std::sync::Arc; +use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt}; use alloy_primitives::TxHash; use alloy_rpc_types_eth::{ - pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata}, + pubsub::{ + Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams, + }, Filter, Log, }; use futures::StreamExt; @@ -13,7 +16,8 @@ use jsonrpsee::{ }; use reth_chain_state::CanonStateSubscriptions; use reth_network_api::NetworkInfo; -use reth_rpc_convert::RpcHeader; +use reth_primitives_traits::TransactionMeta; +use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcHeader}; use reth_rpc_eth_api::{ pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction, }; @@ -185,10 +189,97 @@ where Ok(()) } - _ => { - // TODO: implement once https://github.com/alloy-rs/alloy/pull/3410 is released - Err(invalid_params_rpc_err("Unsupported subscription kind")) + SubscriptionKind::TransactionReceipts => { + let filter = match params { + Some(Params::TransactionReceipts(filter)) => filter, + None | Some(Params::None) => TransactionReceiptsParams::default(), + _ => { + return Err(invalid_params_rpc_err("Invalid params for transactionReceipts")) + } + }; + + let converter = self.inner.eth_api.converter(); + let stream = self.inner.eth_api.provider().canonical_state_stream().flat_map( + move |new_chain| { + // for each block in the new chain, build RPC receipts + let results: Vec<_> = new_chain + .committed() + .blocks_and_receipts() + .filter_map(|(block, receipts)| { + let block_hash = block.hash(); + let block_number = block.number(); + let base_fee = block.base_fee_per_gas(); + let excess_blob_gas = block.excess_blob_gas(); + let timestamp = block.timestamp(); + + let mut gas_used: u64 = 0; + let mut next_log_index: usize = 0; + + // build ConvertReceiptInput for each tx+receipt pair + // (same logic as eth_getBlockReceipts HTTP endpoint) + let inputs: Vec<_> = block + .transactions_recovered() + .zip(receipts.iter()) + .enumerate() + .filter_map(|(idx, (tx, receipt))| { + let gas_used_before = gas_used; + let next_log_index_before = next_log_index; + let cumulative_gas_used = receipt.cumulative_gas_used(); + + gas_used = cumulative_gas_used; + next_log_index += receipt.logs().len(); + + // apply transaction hash filter if provided + let matches = match &filter.transaction_hashes { + Some(hashes) if !hashes.is_empty() => { + hashes.contains(tx.tx_hash()) + } + _ => true, + }; + + matches.then(|| ConvertReceiptInput { + tx, + gas_used: cumulative_gas_used - gas_used_before, + next_log_index: next_log_index_before, + meta: TransactionMeta { + tx_hash: *tx.tx_hash(), + index: idx as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }, + receipt: receipt.clone(), + }) + }) + .collect(); + + if inputs.is_empty() { + return None; + } + + match converter.convert_receipts(inputs) { + Ok(rpc_receipts) => Some(rpc_receipts), + Err(err) => { + error!( + target = "rpc", + %err, + "Failed to convert receipts" + ); + None + } + } + }) + .collect(); + + futures::stream::iter(results) + }, + ); + + pipe_from_stream(accepted_sink, stream).await } + _ => Err(invalid_params_rpc_err("Unsupported subscription kind")), } } }