Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/rpc/rpc-builder/tests/it/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ async fn test_eth_subscribe_all_supported_kinds_accept() {
serde_json::json!({"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]}),
],
),
("transactionReceipts", vec![]),
("transactionReceipts", vec![serde_json::json!({"transactionHashes": []})]),
(
"transactionReceipts",
vec![
serde_json::json!({"transactionHashes": ["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}),
],
),
];

for (kind, params) in cases {
Expand Down
101 changes: 96 additions & 5 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

use std::sync::Arc;

use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams,
},
Filter, Log,
};
use futures::StreamExt;
Expand All @@ -13,7 +16,8 @@ use jsonrpsee::{
};
use reth_chain_state::CanonStateSubscriptions;
use reth_network_api::NetworkInfo;
use reth_rpc_convert::RpcHeader;
use reth_primitives_traits::TransactionMeta;
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcHeader};
use reth_rpc_eth_api::{
pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
};
Expand Down Expand Up @@ -185,10 +189,97 @@ where

Ok(())
}
_ => {
// TODO: implement once https://github.com/alloy-rs/alloy/pull/3410 is released
Err(invalid_params_rpc_err("Unsupported subscription kind"))
SubscriptionKind::TransactionReceipts => {
let filter = match params {
Some(Params::TransactionReceipts(filter)) => filter,
None | Some(Params::None) => TransactionReceiptsParams::default(),
_ => {
return Err(invalid_params_rpc_err("Invalid params for transactionReceipts"))
}
};

let converter = self.inner.eth_api.converter();
let stream = self.inner.eth_api.provider().canonical_state_stream().flat_map(
move |new_chain| {
// for each block in the new chain, build RPC receipts
let results: Vec<_> = new_chain
.committed()
.blocks_and_receipts()
.filter_map(|(block, receipts)| {
let block_hash = block.hash();
let block_number = block.number();
let base_fee = block.base_fee_per_gas();
let excess_blob_gas = block.excess_blob_gas();
let timestamp = block.timestamp();

let mut gas_used: u64 = 0;
let mut next_log_index: usize = 0;

// build ConvertReceiptInput for each tx+receipt pair
// (same logic as eth_getBlockReceipts HTTP endpoint)
let inputs: Vec<_> = block
.transactions_recovered()
.zip(receipts.iter())
.enumerate()
.filter_map(|(idx, (tx, receipt))| {
let gas_used_before = gas_used;
let next_log_index_before = next_log_index;
let cumulative_gas_used = receipt.cumulative_gas_used();

gas_used = cumulative_gas_used;
next_log_index += receipt.logs().len();

// apply transaction hash filter if provided
let matches = match &filter.transaction_hashes {
Some(hashes) if !hashes.is_empty() => {
hashes.contains(tx.tx_hash())
}
_ => true,
};

matches.then(|| ConvertReceiptInput {
tx,
gas_used: cumulative_gas_used - gas_used_before,
next_log_index: next_log_index_before,
meta: TransactionMeta {
tx_hash: *tx.tx_hash(),
index: idx as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas,
timestamp,
},
receipt: receipt.clone(),
})
})
.collect();

if inputs.is_empty() {
return None;
}

match converter.convert_receipts(inputs) {
Ok(rpc_receipts) => Some(rpc_receipts),
Err(err) => {
error!(
target = "rpc",
%err,
"Failed to convert receipts"
);
None
}
}
})
.collect();

futures::stream::iter(results)
},
);

pipe_from_stream(accepted_sink, stream).await
}
_ => Err(invalid_params_rpc_err("Unsupported subscription kind")),
}
}
}
Expand Down
Loading