Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
}
}

/// The actual handler for and accepted [`EthPubSub::subscribe`] call.
/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Provider, Pool, Events, Network>(
pubsub: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
accepted_sink: SubscriptionSink,
Expand Down Expand Up @@ -267,7 +267,7 @@ impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Net
where
Pool: TransactionPool + 'static,
{
/// Returns a stream that yields all transactions emitted by the txpool.
/// Returns a stream that yields all transaction hashes emitted by the txpool.
fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener())
}
Expand Down
56 changes: 52 additions & 4 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,18 +561,26 @@ where
})
}

/// Notifies transaction listeners about changes after a block was processed.
/// Notifies transaction listeners about changes once a block was processed.
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
// notify about promoted pending transactions
{
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
// emit hashes
let mut transaction_hash_listeners = self.pending_transaction_listener.lock();
transaction_hash_listeners.retain_mut(|listener| {
listener.send_all(outcome.pending_transactions(listener.kind))
});

// emit full transactions
let mut transaction_full_listeners = self.transaction_listener.lock();
transaction_full_listeners.retain_mut(|listener| {
listener.send_all(outcome.full_pending_transactions(listener.kind))
})
}

let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;

// broadcast specific transaction events
let mut listener = self.event_listener.write();

mined.iter().for_each(|tx| listener.mined(tx, block_hash));
Expand Down Expand Up @@ -918,6 +926,33 @@ where
}
}

/// An iterator over full pending transactions
pub(crate) struct FullPendingTransactionIter<Iter> {
kind: TransactionListenerKind,
iter: Iter,
}

impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
where
Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
T: PoolTransaction + 'a,
{
type Item = NewTransactionEvent<T>;

fn next(&mut self) -> Option<Self::Item> {
loop {
let next = self.iter.next()?;
if self.kind.is_propagate_only() && !next.propagate {
continue
}
return Some(NewTransactionEvent {
subpool: SubPool::Pending,
transaction: next.clone(),
})
}
}
}

/// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> {
Expand Down Expand Up @@ -1011,7 +1046,7 @@ pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
pub(crate) block_hash: B256,
/// All mined transactions.
pub(crate) mined: Vec<TxHash>,
/// Transactions promoted to the ready queue.
/// Transactions promoted to the pending pool.
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that were discarded during the update
pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
Expand All @@ -1030,4 +1065,17 @@ impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
let iter = self.promoted.iter();
PendingTransactionIter { kind, iter }
}

/// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
/// [TransactionListenerKind].
///
/// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn full_pending_transactions(
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
let iter = self.promoted.iter();
FullPendingTransactionIter { kind, iter }
}
}