Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions crates/transaction-pool/src/pool/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use std::{
sync::Arc,
};

/// Blob transactions with priority at or above this threshold are announced to peers.
///
/// A priority of -1 means the transaction is within 1 fee-jump of being executable. Transactions
/// further away from execution are suppressed to save bandwidth.
///
/// See also [`blob_tx_priority`].
const BLOB_ANNOUNCE_PRIORITY_THRESHOLD: i64 = -1;

/// A set of validated blob transactions in the pool that are __not pending__.
///
/// The purpose of this pool is to keep track of blob transactions that are queued and to evict the
Expand Down Expand Up @@ -39,11 +47,14 @@ pub struct BlobTransactions<T: PoolTransaction> {
impl<T: PoolTransaction> BlobTransactions<T> {
/// Adds a new transactions to the pending queue.
///
/// Returns `true` if the transaction is immediately announceable (priority >=
/// `BLOB_ANNOUNCE_PRIORITY_THRESHOLD`).
///
/// # Panics
///
/// - If the transaction is not a blob tx.
/// - If the transaction is already included.
pub fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T>>) {
pub fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T>>) -> bool {
assert!(tx.is_eip4844(), "transaction is not a blob tx");
let id = *tx.id();
assert!(!self.contains(&id), "transaction already included {:?}", self.get(&id).unwrap());
Expand All @@ -53,10 +64,25 @@ impl<T: PoolTransaction> BlobTransactions<T> {
self.size_of += tx.size();

// set transaction, which will also calculate priority based on current pending fees
let transaction = BlobTransaction::new(tx, submission_id, &self.pending_fees);
let mut transaction = BlobTransaction::new(tx, submission_id, &self.pending_fees);

// Only announce if priority is high enough AND the predecessor nonce (if any) in this
// pool was already announced. This prevents announcing a tx whose predecessor is still
// suppressed — peers can't use it without the full nonce chain.
let predecessor_announced = id
.unchecked_ancestor()
.and_then(|prev_id| self.by_id.get(&prev_id))
.is_none_or(|prev| prev.announced);
let announceable =
predecessor_announced && transaction.ord.priority >= BLOB_ANNOUNCE_PRIORITY_THRESHOLD;
if announceable {
transaction.announced = true;
}

self.by_id.insert(id, transaction.clone());
self.all.insert(transaction);

announceable
}

const fn next_id(&mut self) -> u64 {
Expand Down Expand Up @@ -169,6 +195,10 @@ impl<T: PoolTransaction> BlobTransactions<T> {
}

/// Resorts the transactions in the pool based on the pool's current [`PendingFees`].
///
/// The `announced` flag is intentionally NOT reset when priority drops — once a transaction
/// has been announced, peers already have the hash and re-announcing serves no purpose.
/// This avoids repeated announcements during fee oscillations.
pub(crate) fn reprioritize(&mut self) {
// mem::take to modify without allocating, then collect to rebuild the BTreeSet
self.all = std::mem::take(&mut self.all)
Expand All @@ -186,6 +216,43 @@ impl<T: PoolTransaction> BlobTransactions<T> {
}
}

/// Returns blob transactions that are close enough to execution to announce but haven't
/// been announced yet.
///
/// A transaction is announceable if its priority is >= [`BLOB_ANNOUNCE_PRIORITY_THRESHOLD`]
/// (within 1 fee-jump of being executable) and all lower-nonce transactions from the same
/// sender have already been announced. Once drained, the transactions are marked as announced
/// so they won't be returned again.
pub(crate) fn drain_newly_announceable(&mut self) -> Vec<Arc<ValidPoolTransaction<T>>> {
let mut result = Vec::new();
// Track the sender whose nonce chain is broken so we skip its remaining txs.
// `by_id` is a BTreeMap<TransactionId, _> ordered by (sender, nonce), so we naturally
// iterate each sender's transactions in nonce order.
let mut blocked_sender = None;
for (id, tx) in &mut self.by_id {
// Reset blocked state when we move to a new sender
if blocked_sender == Some(id.sender) {
continue
}
blocked_sender = None;

if tx.announced {
continue
}
if tx.ord.priority >= BLOB_ANNOUNCE_PRIORITY_THRESHOLD {
tx.announced = true;
result.push(tx.transaction.clone());
} else {
// This tx doesn't meet the threshold — block all remaining txs from this sender
// to maintain nonce-chain ordering.
blocked_sender = Some(id.sender);
}
}
// No need to sync `announced` back to `all` — it's not part of `Ord` so the BTreeSet
// ordering is unaffected. The `by_id` map is the source of truth for `announced` state.
result
}

/// Removes all transactions (and their descendants) which:
/// * have a `max_fee_per_blob_gas` greater than or equal to the given `blob_fee`, _and_
/// * have a `max_fee_per_gas` greater than or equal to the given `base_fee`
Expand Down Expand Up @@ -265,6 +332,11 @@ struct BlobTransaction<T: PoolTransaction> {
transaction: Arc<ValidPoolTransaction<T>>,
/// The value that determines the order of this transaction.
ord: BlobOrd,
/// Whether this transaction has been announced to peers.
///
/// Only transactions with priority >= [`BLOB_ANNOUNCE_PRIORITY_THRESHOLD`] are announced.
/// Once set, this flag is never reset — peers already have the hash.
announced: bool,
}

impl<T: PoolTransaction> BlobTransaction<T> {
Expand All @@ -282,7 +354,7 @@ impl<T: PoolTransaction> BlobTransaction<T> {
pending_fees.base_fee as u128,
);
let ord = BlobOrd { priority, submission_id };
Self { transaction, ord }
Self { transaction, ord, announced: false }
}

/// Updates the priority for the transaction based on the current pending fees.
Expand All @@ -298,7 +370,11 @@ impl<T: PoolTransaction> BlobTransaction<T> {

impl<T: PoolTransaction> Clone for BlobTransaction<T> {
fn clone(&self) -> Self {
Self { transaction: self.transaction.clone(), ord: self.ord.clone() }
Self {
transaction: self.transaction.clone(),
ord: self.ord.clone(),
announced: self.announced,
}
}
}

Expand Down
96 changes: 64 additions & 32 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ where
/// This should be invoked when the pool drifted and accounts are updated manually
pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
let changed_senders = self.changed_senders(accounts.into_iter());
let UpdateOutcome { promoted, discarded } =
let UpdateOutcome { promoted, discarded, .. } =
self.pool.write().update_accounts(changed_senders);

self.notify_on_transaction_updates(promoted, discarded);
Expand Down Expand Up @@ -740,41 +740,65 @@ where
self.on_new_pending_transaction(pending);
}

// Notify listeners for blob transactions that are close enough to execution to announce
if let Some(tx) = meta.added.blob_announceable_transaction() {
self.on_blob_announceable_transaction(tx);
}

// Notify event listeners
self.notify_event_listeners(&meta.added);

// Notify new transaction listeners
self.on_new_transaction(meta.added.into_new_transaction_event());
}

/// Notify all listeners about a new pending transaction.
///
/// See also [`Self::add_pending_listener`]
///
/// CAUTION: This function is only intended to be used manually in order to use this type's
/// pending transaction receivers when manually implementing the
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
/// Sends hashes to all pending transaction listeners, cleaning up closed channels.
fn notify_pending_listeners(
&self,
mut send_fn: impl FnMut(&PendingTransactionHashListener) -> bool,
) {
let mut needs_cleanup = false;

{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(pending.pending_transactions(listener.kind)) {
if !send_fn(listener) {
needs_cleanup = true;
}
}
}

// Clean up dead listeners if we detected any closed channels
if needs_cleanup {
self.pending_transaction_listener
.write()
.retain(|listener| !listener.sender.is_closed());
}
}

/// Notify all listeners about a new pending transaction.
///
/// See also [`Self::add_pending_listener`]
///
/// CAUTION: This function is only intended to be used manually in order to use this type's
/// pending transaction receivers when manually implementing the
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
self.notify_pending_listeners(|listener| {
listener.send_all(pending.pending_transactions(listener.kind))
});
}

/// Notify pending transaction listeners about a blob transaction that is close enough to
/// execution to announce (within 1 fee-jump).
///
/// Respects the `propagate` flag — private transactions are not announced.
fn on_blob_announceable_transaction(&self, tx: &Arc<ValidPoolTransaction<T::Transaction>>) {
if !tx.propagate {
return;
}
let hash = *tx.hash();
self.notify_pending_listeners(|listener| listener.send_all(std::iter::once(hash)));
}

/// Notify all listeners about a newly inserted pending transaction.
///
/// See also [`Self::add_new_transaction_listener`]
Expand Down Expand Up @@ -840,19 +864,11 @@ where
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");

// notify about promoted pending transactions - emit hashes
let mut needs_pending_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(outcome.pending_transactions(listener.kind)) {
needs_pending_cleanup = true;
}
}
}
if needs_pending_cleanup {
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
}
// notify about promoted pending transactions (and blob txs close to execution) - emit
// hashes
self.notify_pending_listeners(|listener| {
listener.send_all(outcome.pending_transactions(listener.kind))
});

// emit full transactions
let mut needs_tx_cleanup = false;
Expand All @@ -868,7 +884,7 @@ where
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
}

let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash, .. } = outcome;

// broadcast specific transaction events
self.with_event_listener(|listener| {
Expand Down Expand Up @@ -1422,6 +1438,8 @@ pub enum AddedTransaction<T: PoolTransaction> {
subpool: SubPool,
/// The specific reason why the transaction is queued (if applicable).
queued_reason: Option<QueuedReason>,
/// Whether this blob transaction is close enough to execution to announce.
blob_announceable: bool,
},
}

Expand All @@ -1434,6 +1452,15 @@ impl<T: PoolTransaction> AddedTransaction<T> {
}
}

/// Returns the transaction if it was added to the blob pool and is close enough to
/// execution to announce.
pub const fn blob_announceable_transaction(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
match self {
Self::Parked { transaction, blob_announceable: true, .. } => Some(transaction),
_ => None,
}
}

/// Returns the replaced transaction if there was one
pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
match self {
Expand Down Expand Up @@ -1596,10 +1623,14 @@ pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that were discarded during the update
pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
/// Blob transactions that became close enough to execution to announce (within 1 fee-jump)
/// but are still in the blob sub-pool.
pub(crate) blob_announced: Vec<Arc<ValidPoolTransaction<T>>>,
}

impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// Returns all transactions that were promoted to the pending pool (plus blob transactions
/// that became close enough to execution to announce) and adhere to the given
/// [`TransactionListenerKind`].
///
/// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
Expand All @@ -1608,11 +1639,12 @@ impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = B256> + '_ {
let iter = self.promoted.iter();
let iter = self.promoted.iter().chain(self.blob_announced.iter());
PendingTransactionIter { kind, iter }
}

/// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
/// Returns all FULL transactions that were promoted to the pending pool (plus blob
/// transactions that became close enough to execution to announce) and adhere to the given
/// [`TransactionListenerKind`].
///
/// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
Expand All @@ -1621,7 +1653,7 @@ impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
let iter = self.promoted.iter();
let iter = self.promoted.iter().chain(self.blob_announced.iter());
FullPendingTransactionIter { kind, iter }
}
}
Expand Down
Loading
Loading