Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/transaction-pool/src/pool/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{traits::PropagateKind, PoolTransaction, SubPool, ValidPoolTransactio
use alloy_primitives::{TxHash, B256};
use std::sync::Arc;

use crate::pool::QueuedReason;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand All @@ -11,7 +12,9 @@ pub enum FullTransactionEvent<T: PoolTransaction> {
/// Transaction has been added to the pending pool.
Pending(TxHash),
/// Transaction has been added to the queued pool.
Queued(TxHash),
///
/// If applicable, attached the specific reason why this was queued.
Queued(TxHash, Option<QueuedReason>),
/// Transaction has been included in the block belonging to this hash.
Mined {
/// The hash of the mined transaction.
Expand Down Expand Up @@ -40,7 +43,7 @@ impl<T: PoolTransaction> Clone for FullTransactionEvent<T> {
fn clone(&self) -> Self {
match self {
Self::Pending(hash) => Self::Pending(*hash),
Self::Queued(hash) => Self::Queued(*hash),
Self::Queued(hash, reason) => Self::Queued(*hash, reason.clone()),
Self::Mined { tx_hash, block_hash } => {
Self::Mined { tx_hash: *tx_hash, block_hash: *block_hash }
}
Expand Down
14 changes: 11 additions & 3 deletions crates/transaction-pool/src/pool/listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! Listeners for the transaction-pool

use crate::{
pool::events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
pool::{
events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent},
QueuedReason,
},
traits::{NewBlobSidecar, PropagateKind},
PoolTransaction, ValidPoolTransaction,
};
Expand All @@ -17,6 +20,7 @@ use tokio::sync::mpsc::{
self as mpsc, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tracing::debug;

/// The size of the event channel used to propagate transaction events.
const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;

Expand Down Expand Up @@ -164,8 +168,12 @@ impl<T: PoolTransaction> PoolEventBroadcast<T> {
}

/// Notify listeners about a transaction that was added to the queued pool.
pub(crate) fn queued(&mut self, tx: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
pub(crate) fn queued(&mut self, tx: &TxHash, reason: Option<QueuedReason>) {
self.broadcast_event(
tx,
TransactionEvent::Queued,
FullTransactionEvent::Queued(*tx, reason),
);
}

/// Notify listeners about a transaction that was propagated.
Expand Down
4 changes: 2 additions & 2 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,8 @@ where
listener.discarded(tx.hash());
}
}
AddedTransaction::Parked { transaction, replaced, .. } => {
listener.queued(transaction.hash());
AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
listener.queued(transaction.hash(), queued_reason.clone());
if let Some(replaced) = replaced {
listener.replaced(replaced.clone(), *transaction.hash());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-pool/tests/it/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn txpool_listener_queued_event() {
assert_matches!(events.next().await, Some(TransactionEvent::Queued));

// The listener of all should receive queued event as well.
assert_matches!(all_tx_events.next().await, Some(FullTransactionEvent::Queued(hash)) if hash == *transaction.get_hash());
assert_matches!(all_tx_events.next().await, Some(FullTransactionEvent::Queued(hash,_ )) if hash == *transaction.get_hash());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down