Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 211 additions & 2 deletions crates/rpc-types-eth/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Ethereum types for pub-sub

use crate::{Filter, Header, Log, Transaction};
use alloc::{boxed::Box, format};
use crate::{Filter, Header, Log, Transaction, TransactionReceipt};
use alloc::{boxed::Box, format, vec::Vec};
use alloy_primitives::B256;
use alloy_serde::WithOtherFields;

Expand All @@ -20,6 +20,8 @@ pub enum SubscriptionResult<T = Transaction> {
FullTransaction(Box<T>),
/// SyncStatus
SyncState(PubSubSyncStatus),
/// Transaction Receipts
TransactionReceipts(Vec<TransactionReceipt>),
}

/// Response type for a SyncStatus subscription.
Expand Down Expand Up @@ -64,6 +66,7 @@ where
Self::TransactionHash(ref hash) => hash.serialize(serializer),
Self::FullTransaction(ref tx) => tx.serialize(serializer),
Self::SyncState(ref sync) => sync.serialize(serializer),
Self::TransactionReceipts(ref receipts) => receipts.serialize(serializer),
}
}
}
Expand Down Expand Up @@ -101,6 +104,51 @@ pub enum SubscriptionKind {
/// indicating that the synchronization has started (true), finished (false) or an object with
/// various progress indicators.
Syncing,
/// New transaction receipts subscription.
///
/// Returns transaction receipts that are included in new imported blocks and match the given
/// filter criteria. In case of a chain reorganization the subscription will emit transaction
/// receipts for the new chain if they match the filter criteria. Therefore, the subscription
/// can emit same transaction receipts multiple times.
TransactionReceipts,
}

/// The maximum number of transaction hash criteria allowed in a single subscription.
pub const MAX_TX_HASHES: usize = 200;

/// Parameters for transaction receipts subscription.
///
/// # Example
///
/// ```json
/// // Subscribe to specific transaction receipts
/// {
/// "transactionHashes": [
/// "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"
/// ]
/// }
/// ```
///
/// ```json
/// // Subscribe to all transaction receipts (no filter)
/// {
/// "transactionHashes": null
/// }
/// ```json
/// // Subscribe to all transaction receipts (no filter)
/// {
/// "transactionHashes": []
/// }
/// ```
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
pub struct TransactionReceiptsParams {
/// Optional list of transaction hashes to filter by.
/// If not provided or empty, all transaction receipts will be returned.
/// Limited to MAX_TX_HASHES items.
#[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
pub transaction_hashes: Option<Vec<B256>>,
}

/// Any additional parameters for a subscription.
Expand All @@ -113,6 +161,8 @@ pub enum Params {
Logs(Box<Filter>),
/// Boolean parameter for new pending transactions.
Bool(bool),
/// Transaction receipts parameters.
TransactionReceipts(TransactionReceiptsParams),
}

impl Params {
Expand Down Expand Up @@ -141,6 +191,75 @@ impl From<bool> for Params {
}
}

impl From<TransactionReceiptsParams> for Params {
fn from(params: TransactionReceiptsParams) -> Self {
Self::TransactionReceipts(params)
}
}

#[cfg(feature = "serde")]
impl<'de> serde::Deserialize<'de> for TransactionReceiptsParams {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use core::fmt;
use serde::de::{Error, MapAccess, Visitor};

struct TransactionReceiptsParamsVisitor;

impl<'de> Visitor<'de> for TransactionReceiptsParamsVisitor {
type Value = TransactionReceiptsParams;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("a transaction receipts parameters object")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut transaction_hashes: Option<Option<Vec<B256>>> = None;

while let Some(key) = map.next_key::<alloc::string::String>()? {
match key.as_str() {
"transactionHashes" => {
if transaction_hashes.is_some() {
return Err(A::Error::duplicate_field("transactionHashes"));
}
let hashes: Option<Vec<B256>> = map.next_value()?;

if let Some(ref hash_vec) = hashes {
if hash_vec.len() > MAX_TX_HASHES {
return Err(A::Error::custom(format!(
"exceed max number of transaction hashes allowed per transactionReceipts subscription: {} items (max: {})",
hash_vec.len(),
MAX_TX_HASHES
)));
}
}

transaction_hashes = Some(hashes);
}
key => {
return Err(serde::de::Error::unknown_field(
key,
&["transactionHashes"],
))
}
}
}

Ok(TransactionReceiptsParams {
transaction_hashes: transaction_hashes.unwrap_or_default(),
})
}
}

deserializer.deserialize_map(TransactionReceiptsParamsVisitor)
}
}

#[cfg(feature = "serde")]
impl serde::Serialize for Params {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand All @@ -151,6 +270,7 @@ impl serde::Serialize for Params {
Self::None => (&[] as &[serde_json::Value]).serialize(serializer),
Self::Logs(logs) => logs.serialize(serializer),
Self::Bool(full) => full.serialize(serializer),
Self::TransactionReceipts(params) => params.serialize(serializer),
}
}
}
Expand All @@ -173,6 +293,17 @@ impl<'a> serde::Deserialize<'a> for Params {
return Ok(val.into());
}

// Check if it's a transaction receipts parameter by looking for transactionHashes field
if let Some(obj) = v.as_object() {
if obj.contains_key("transactionHashes") {
return serde_json::from_value::<TransactionReceiptsParams>(v)
.map(Into::into)
.map_err(|e| {
D::Error::custom(format!("Invalid transaction receipts parameters: {e}"))
});
}
}

serde_json::from_value::<Filter>(v)
.map(Into::into)
.map_err(|e| D::Error::custom(format!("Invalid Pub-Sub parameters: {e}")))
Expand All @@ -182,6 +313,7 @@ impl<'a> serde::Deserialize<'a> for Params {
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::hex;
use similar_asserts::assert_eq;

#[test]
Expand All @@ -199,6 +331,41 @@ mod tests {
let filter = Filter::default();
let s: Params = serde_json::from_str(&serde_json::to_string(&filter).unwrap()).unwrap();
assert_eq!(s, Params::Logs(Box::new(filter)));

// Test deserialization of transaction receipts parameters
let json = r#"{"transactionHashes":["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}"#;
let param: Params = serde_json::from_str(json).unwrap();
match param {
Params::TransactionReceipts(params) => {
assert_eq!(
params.transaction_hashes,
Some(vec![B256::from(hex!(
"0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"
))])
);
}
_ => panic!("Expected TransactionReceipts variant"),
}

// Test deserialization of transaction receipts parameters, with null transactionHashes
let json = r#"{"transactionHashes":null}"#;
let param: Params = serde_json::from_str(json).unwrap();
match param {
Params::TransactionReceipts(params) => {
assert_eq!(params.transaction_hashes, None);
}
_ => panic!("Expected TransactionReceipts variant"),
}

// Test deserialization of transaction receipts parameters, with empty transactionHashes
let json = r#"{"transactionHashes":[]}"#;
let param: Params = serde_json::from_str(json).unwrap();
match param {
Params::TransactionReceipts(params) => {
assert_eq!(params.transaction_hashes, Some(vec![]));
}
_ => panic!("Expected TransactionReceipts variant"),
}
}

#[test]
Expand Down Expand Up @@ -243,6 +410,13 @@ mod tests {
assert_eq!(param, Params::Bool(false));
}

#[test]
fn params_from_transaction_receipts() {
let params = TransactionReceiptsParams { transaction_hashes: Some(vec![B256::random()]) };
let param: Params = params.clone().into();
assert_eq!(param, Params::TransactionReceipts(params));
}

#[test]
#[cfg(feature = "serde")]
fn params_serialize_none() {
Expand Down Expand Up @@ -272,4 +446,39 @@ mod tests {
let expected = serde_json::to_string(&filter).unwrap();
assert_eq!(serialized, expected);
}

#[test]
#[cfg(feature = "serde")]
fn params_serialize_transaction_receipts() {
let params = TransactionReceiptsParams {
transaction_hashes: Some(vec![B256::from(hex!(
"0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"
))]),
};
let param = Params::TransactionReceipts(params);
let serialized = serde_json::to_string(&param).unwrap();
let expected = r#"{"transactionHashes":["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}"#;
assert_eq!(serialized, expected);
}

#[test]
#[cfg(feature = "serde")]
fn params_transaction_hashes_limit() {
// Test rejection of arrays exceeding the limit
let large_array: Vec<_> = (0..=MAX_TX_HASHES).map(|i| format!("0x{:064x}", i)).collect();
let json_payload = serde_json::json!({ "transactionHashes": large_array });
let result: Result<TransactionReceiptsParams, _> = serde_json::from_value(json_payload);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("exceed max number of transaction hashes"));

// Test acceptance of arrays at the limit
let valid_array: Vec<_> = (0..MAX_TX_HASHES).map(|i| format!("0x{:064x}", i)).collect();
let json_payload = serde_json::json!({ "transactionHashes": valid_array });
let result: Result<TransactionReceiptsParams, _> = serde_json::from_value(json_payload);
assert!(result.is_ok());
assert_eq!(result.unwrap().transaction_hashes.unwrap().len(), MAX_TX_HASHES);
}
}