diff --git a/crates/transaction-pool/src/pool/blob.rs b/crates/transaction-pool/src/pool/blob.rs index 86476cdd855..455ca0b6b76 100644 --- a/crates/transaction-pool/src/pool/blob.rs +++ b/crates/transaction-pool/src/pool/blob.rs @@ -9,6 +9,14 @@ use std::{ sync::Arc, }; +/// Blob transactions with priority at or above this threshold are announced to peers. +/// +/// A priority of -1 means the transaction is within 1 fee-jump of being executable. Transactions +/// further away from execution are suppressed to save bandwidth. +/// +/// See also [`blob_tx_priority`]. +const BLOB_ANNOUNCE_PRIORITY_THRESHOLD: i64 = -1; + /// A set of validated blob transactions in the pool that are __not pending__. /// /// The purpose of this pool is to keep track of blob transactions that are queued and to evict the @@ -39,11 +47,14 @@ pub struct BlobTransactions { impl BlobTransactions { /// Adds a new transactions to the pending queue. /// + /// Returns `true` if the transaction is immediately announceable (priority >= + /// `BLOB_ANNOUNCE_PRIORITY_THRESHOLD`). + /// /// # Panics /// /// - If the transaction is not a blob tx. /// - If the transaction is already included. - pub fn add_transaction(&mut self, tx: Arc>) { + pub fn add_transaction(&mut self, tx: Arc>) -> bool { assert!(tx.is_eip4844(), "transaction is not a blob tx"); let id = *tx.id(); assert!(!self.contains(&id), "transaction already included {:?}", self.get(&id).unwrap()); @@ -53,10 +64,25 @@ impl BlobTransactions { self.size_of += tx.size(); // set transaction, which will also calculate priority based on current pending fees - let transaction = BlobTransaction::new(tx, submission_id, &self.pending_fees); + let mut transaction = BlobTransaction::new(tx, submission_id, &self.pending_fees); + + // Only announce if priority is high enough AND the predecessor nonce (if any) in this + // pool was already announced. This prevents announcing a tx whose predecessor is still + // suppressed — peers can't use it without the full nonce chain. + let predecessor_announced = id + .unchecked_ancestor() + .and_then(|prev_id| self.by_id.get(&prev_id)) + .is_none_or(|prev| prev.announced); + let announceable = + predecessor_announced && transaction.ord.priority >= BLOB_ANNOUNCE_PRIORITY_THRESHOLD; + if announceable { + transaction.announced = true; + } self.by_id.insert(id, transaction.clone()); self.all.insert(transaction); + + announceable } const fn next_id(&mut self) -> u64 { @@ -169,6 +195,10 @@ impl BlobTransactions { } /// Resorts the transactions in the pool based on the pool's current [`PendingFees`]. + /// + /// The `announced` flag is intentionally NOT reset when priority drops — once a transaction + /// has been announced, peers already have the hash and re-announcing serves no purpose. + /// This avoids repeated announcements during fee oscillations. pub(crate) fn reprioritize(&mut self) { // mem::take to modify without allocating, then collect to rebuild the BTreeSet self.all = std::mem::take(&mut self.all) @@ -186,6 +216,43 @@ impl BlobTransactions { } } + /// Returns blob transactions that are close enough to execution to announce but haven't + /// been announced yet. + /// + /// A transaction is announceable if its priority is >= [`BLOB_ANNOUNCE_PRIORITY_THRESHOLD`] + /// (within 1 fee-jump of being executable) and all lower-nonce transactions from the same + /// sender have already been announced. Once drained, the transactions are marked as announced + /// so they won't be returned again. + pub(crate) fn drain_newly_announceable(&mut self) -> Vec>> { + let mut result = Vec::new(); + // Track the sender whose nonce chain is broken so we skip its remaining txs. + // `by_id` is a BTreeMap ordered by (sender, nonce), so we naturally + // iterate each sender's transactions in nonce order. + let mut blocked_sender = None; + for (id, tx) in &mut self.by_id { + // Reset blocked state when we move to a new sender + if blocked_sender == Some(id.sender) { + continue + } + blocked_sender = None; + + if tx.announced { + continue + } + if tx.ord.priority >= BLOB_ANNOUNCE_PRIORITY_THRESHOLD { + tx.announced = true; + result.push(tx.transaction.clone()); + } else { + // This tx doesn't meet the threshold — block all remaining txs from this sender + // to maintain nonce-chain ordering. + blocked_sender = Some(id.sender); + } + } + // No need to sync `announced` back to `all` — it's not part of `Ord` so the BTreeSet + // ordering is unaffected. The `by_id` map is the source of truth for `announced` state. + result + } + /// Removes all transactions (and their descendants) which: /// * have a `max_fee_per_blob_gas` greater than or equal to the given `blob_fee`, _and_ /// * have a `max_fee_per_gas` greater than or equal to the given `base_fee` @@ -265,6 +332,11 @@ struct BlobTransaction { transaction: Arc>, /// The value that determines the order of this transaction. ord: BlobOrd, + /// Whether this transaction has been announced to peers. + /// + /// Only transactions with priority >= [`BLOB_ANNOUNCE_PRIORITY_THRESHOLD`] are announced. + /// Once set, this flag is never reset — peers already have the hash. + announced: bool, } impl BlobTransaction { @@ -282,7 +354,7 @@ impl BlobTransaction { pending_fees.base_fee as u128, ); let ord = BlobOrd { priority, submission_id }; - Self { transaction, ord } + Self { transaction, ord, announced: false } } /// Updates the priority for the transaction based on the current pending fees. @@ -298,7 +370,11 @@ impl BlobTransaction { impl Clone for BlobTransaction { fn clone(&self) -> Self { - Self { transaction: self.transaction.clone(), ord: self.ord.clone() } + Self { + transaction: self.transaction.clone(), + ord: self.ord.clone(), + announced: self.announced, + } } } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 8899442064f..687707712d6 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -544,7 +544,7 @@ where /// This should be invoked when the pool drifted and accounts are updated manually pub fn update_accounts(&self, accounts: Vec) { let changed_senders = self.changed_senders(accounts.into_iter()); - let UpdateOutcome { promoted, discarded } = + let UpdateOutcome { promoted, discarded, .. } = self.pool.write().update_accounts(changed_senders); self.notify_on_transaction_updates(promoted, discarded); @@ -740,6 +740,11 @@ where self.on_new_pending_transaction(pending); } + // Notify listeners for blob transactions that are close enough to execution to announce + if let Some(tx) = meta.added.blob_announceable_transaction() { + self.on_blob_announceable_transaction(tx); + } + // Notify event listeners self.notify_event_listeners(&meta.added); @@ -747,27 +752,20 @@ where self.on_new_transaction(meta.added.into_new_transaction_event()); } - /// Notify all listeners about a new pending transaction. - /// - /// See also [`Self::add_pending_listener`] - /// - /// CAUTION: This function is only intended to be used manually in order to use this type's - /// pending transaction receivers when manually implementing the - /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation - /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool). - pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction) { + /// Sends hashes to all pending transaction listeners, cleaning up closed channels. + fn notify_pending_listeners( + &self, + mut send_fn: impl FnMut(&PendingTransactionHashListener) -> bool, + ) { let mut needs_cleanup = false; - { let listeners = self.pending_transaction_listener.read(); for listener in listeners.iter() { - if !listener.send_all(pending.pending_transactions(listener.kind)) { + if !send_fn(listener) { needs_cleanup = true; } } } - - // Clean up dead listeners if we detected any closed channels if needs_cleanup { self.pending_transaction_listener .write() @@ -775,6 +773,32 @@ where } } + /// Notify all listeners about a new pending transaction. + /// + /// See also [`Self::add_pending_listener`] + /// + /// CAUTION: This function is only intended to be used manually in order to use this type's + /// pending transaction receivers when manually implementing the + /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation + /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool). + pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction) { + self.notify_pending_listeners(|listener| { + listener.send_all(pending.pending_transactions(listener.kind)) + }); + } + + /// Notify pending transaction listeners about a blob transaction that is close enough to + /// execution to announce (within 1 fee-jump). + /// + /// Respects the `propagate` flag — private transactions are not announced. + fn on_blob_announceable_transaction(&self, tx: &Arc>) { + if !tx.propagate { + return; + } + let hash = *tx.hash(); + self.notify_pending_listeners(|listener| listener.send_all(std::iter::once(hash))); + } + /// Notify all listeners about a newly inserted pending transaction. /// /// See also [`Self::add_new_transaction_listener`] @@ -840,19 +864,11 @@ where fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) { trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change"); - // notify about promoted pending transactions - emit hashes - let mut needs_pending_cleanup = false; - { - let listeners = self.pending_transaction_listener.read(); - for listener in listeners.iter() { - if !listener.send_all(outcome.pending_transactions(listener.kind)) { - needs_pending_cleanup = true; - } - } - } - if needs_pending_cleanup { - self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed()); - } + // notify about promoted pending transactions (and blob txs close to execution) - emit + // hashes + self.notify_pending_listeners(|listener| { + listener.send_all(outcome.pending_transactions(listener.kind)) + }); // emit full transactions let mut needs_tx_cleanup = false; @@ -868,7 +884,7 @@ where self.transaction_listener.write().retain(|l| !l.sender.is_closed()); } - let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome; + let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash, .. } = outcome; // broadcast specific transaction events self.with_event_listener(|listener| { @@ -1422,6 +1438,8 @@ pub enum AddedTransaction { subpool: SubPool, /// The specific reason why the transaction is queued (if applicable). queued_reason: Option, + /// Whether this blob transaction is close enough to execution to announce. + blob_announceable: bool, }, } @@ -1434,6 +1452,15 @@ impl AddedTransaction { } } + /// Returns the transaction if it was added to the blob pool and is close enough to + /// execution to announce. + pub const fn blob_announceable_transaction(&self) -> Option<&Arc>> { + match self { + Self::Parked { transaction, blob_announceable: true, .. } => Some(transaction), + _ => None, + } + } + /// Returns the replaced transaction if there was one pub const fn replaced(&self) -> Option<&Arc>> { match self { @@ -1596,10 +1623,14 @@ pub(crate) struct OnNewCanonicalStateOutcome { pub(crate) promoted: Vec>>, /// transaction that were discarded during the update pub(crate) discarded: Vec>>, + /// Blob transactions that became close enough to execution to announce (within 1 fee-jump) + /// but are still in the blob sub-pool. + pub(crate) blob_announced: Vec>>, } impl OnNewCanonicalStateOutcome { - /// Returns all transactions that were promoted to the pending pool and adhere to the given + /// Returns all transactions that were promoted to the pending pool (plus blob transactions + /// that became close enough to execution to announce) and adhere to the given /// [`TransactionListenerKind`]. /// /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that @@ -1608,11 +1639,12 @@ impl OnNewCanonicalStateOutcome { &self, kind: TransactionListenerKind, ) -> impl Iterator + '_ { - let iter = self.promoted.iter(); + let iter = self.promoted.iter().chain(self.blob_announced.iter()); PendingTransactionIter { kind, iter } } - /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given + /// Returns all FULL transactions that were promoted to the pending pool (plus blob + /// transactions that became close enough to execution to announce) and adhere to the given /// [`TransactionListenerKind`]. /// /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that @@ -1621,7 +1653,7 @@ impl OnNewCanonicalStateOutcome { &self, kind: TransactionListenerKind, ) -> impl Iterator> + '_ { - let iter = self.promoted.iter(); + let iter = self.promoted.iter().chain(self.blob_announced.iter()); FullPendingTransactionIter { kind, iter } } } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 05a69471b0f..acd16e55028 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -217,13 +217,15 @@ impl TxPool { } /// Updates the tracked blob fee - fn update_blob_fee( + fn update_blob_fee( &mut self, mut pending_blob_fee: u128, base_fee_update: Ordering, mut on_promoted: F, + mut on_blob_announced: G, ) where F: FnMut(&Arc>), + G: FnMut(&Arc>), { std::mem::swap(&mut self.all_transactions.pending_fees.blob_fee, &mut pending_blob_fee); match (self.all_transactions.pending_fees.blob_fee.cmp(&pending_blob_fee), base_fee_update) @@ -245,7 +247,9 @@ impl TxPool { tx.subpool = tx.state.into(); tx.subpool }; - self.add_transaction_to_subpool(to, tx); + if self.add_transaction_to_subpool(to, tx.clone()) { + on_blob_announced(&tx); + } } } (Ordering::Less, _) | (_, Ordering::Less) => { @@ -270,6 +274,11 @@ impl TxPool { } } } + + // After any fee change, drain blob txs that are now close enough to announce + for tx in self.blob_pool.drain_newly_announceable() { + on_blob_announced(&tx); + } } /// Updates the tracked basefee @@ -362,9 +371,16 @@ impl TxPool { outcome.promoted.push(tx.clone()); }); if let Some(blob_fee) = info.pending_blob_fee { - self.update_blob_fee(blob_fee, basefee_ordering, |tx| { - outcome.promoted.push(tx.clone()); - }) + self.update_blob_fee( + blob_fee, + basefee_ordering, + |tx| { + outcome.promoted.push(tx.clone()); + }, + |tx| { + outcome.blob_announced.push(tx.clone()); + }, + ) } // then update tracked values self.all_transactions.set_block_info(info); @@ -627,9 +643,16 @@ impl TxPool { outcome.promoted.push(tx.clone()); }); - self.update_blob_fee(new_blob_fee, base_fee_ordering, |tx| { - outcome.promoted.push(tx.clone()); - }); + self.update_blob_fee( + new_blob_fee, + base_fee_ordering, + |tx| { + outcome.promoted.push(tx.clone()); + }, + |tx| { + outcome.blob_announced.push(tx.clone()); + }, + ); } /// Updates the transactions for the changed senders. @@ -700,6 +723,7 @@ impl TxPool { mined: mined_transactions, promoted: outcome.promoted, discarded: outcome.discarded, + blob_announced: outcome.blob_announced, } } @@ -776,28 +800,42 @@ impl TxPool { // 3. Promote higher-nonce txs last (e.g. gap-fill scenario) let new_nonce = transaction.id().nonce; let split = updates.iter().position(|u| u.id.nonce >= new_nonce); - let (promoted, discarded) = match split { + let (promoted, discarded, blob_announceable) = match split { // All updates are lower-nonce — promote them first, then add new tx None => { - let UpdateOutcome { promoted, discarded } = self.process_updates(updates); - self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to); - (promoted, discarded) + let UpdateOutcome { promoted, discarded, .. } = + self.process_updates(updates); + let blob_announceable = self.add_new_transaction( + transaction.clone(), + replaced_tx.clone(), + move_to, + ); + (promoted, discarded, blob_announceable) } // All updates are higher-nonce — add new tx first, then promote Some(0) => { - self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to); - let UpdateOutcome { promoted, discarded } = self.process_updates(updates); - (promoted, discarded) + let blob_announceable = self.add_new_transaction( + transaction.clone(), + replaced_tx.clone(), + move_to, + ); + let UpdateOutcome { promoted, discarded, .. } = + self.process_updates(updates); + (promoted, discarded, blob_announceable) } // Mixed — split and interleave Some(i) => { let after = updates.split_off(i); let mut outcome = self.process_updates(updates); - self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to); + let blob_announceable = self.add_new_transaction( + transaction.clone(), + replaced_tx.clone(), + move_to, + ); let after_outcome = self.process_updates(after); outcome.promoted.extend(after_outcome.promoted); outcome.discarded.extend(after_outcome.discarded); - (outcome.promoted, outcome.discarded) + (outcome.promoted, outcome.discarded, blob_announceable) } }; self.metrics.inserted_transactions.increment(1); @@ -820,6 +858,7 @@ impl TxPool { subpool: move_to, replaced, queued_reason, + blob_announceable, } }; @@ -1194,37 +1233,46 @@ impl TxPool { } /// Inserts the transaction into the given sub-pool. + /// + /// Returns `true` if the transaction was added to the blob pool and is immediately + /// announceable (within 1 fee-jump of execution). fn add_transaction_to_subpool( &mut self, pool: SubPool, tx: Arc>, - ) { + ) -> bool { // We trace here instead of in structs directly, because the `ParkedPool` type is // generic and it would not be possible to distinguish whether a transaction is being // added to the `BaseFee` pool, or the `Queued` pool. trace!(target: "txpool", hash=%tx.transaction.hash(), ?pool, "Adding transaction to a subpool"); match pool { - SubPool::Queued => self.queued_pool.add_transaction(tx), + SubPool::Queued => { + self.queued_pool.add_transaction(tx); + false + } SubPool::Pending => { self.pending_pool.add_transaction(tx, self.all_transactions.pending_fees.base_fee); + false } SubPool::BaseFee => { self.basefee_pool.add_transaction(tx); + false } - SubPool::Blob => { - self.blob_pool.add_transaction(tx); - } + SubPool::Blob => self.blob_pool.add_transaction(tx), } } /// Inserts the transaction into the given sub-pool. /// Optionally, removes the replacement transaction. + /// + /// Returns `true` if the transaction was added to the blob pool and is immediately + /// announceable. fn add_new_transaction( &mut self, transaction: Arc>, replaced: Option<(Arc>, SubPool)>, pool: SubPool, - ) { + ) -> bool { if let Some((replaced, replaced_pool)) = replaced { // Remove the replaced transaction self.remove_from_subpool(replaced_pool, replaced.id()); @@ -3510,7 +3558,7 @@ mod tests { // Raise blob fee beyond the transaction's cap so it gets parked in Blob pool. let increased_blob_fee = tx.max_fee_per_blob_gas().unwrap() + 200; - pool.update_blob_fee(increased_blob_fee, Ordering::Equal, |_| {}); + pool.update_blob_fee(increased_blob_fee, Ordering::Equal, |_| {}, |_| {}); assert!(pool.pending_pool.is_empty()); assert_eq!(pool.blob_pool.len(), 1); diff --git a/crates/transaction-pool/src/pool/update.rs b/crates/transaction-pool/src/pool/update.rs index 851c017b206..2064d4dd20b 100644 --- a/crates/transaction-pool/src/pool/update.rs +++ b/crates/transaction-pool/src/pool/update.rs @@ -40,10 +40,13 @@ pub struct UpdateOutcome { pub promoted: Vec>>, /// transaction that failed and were discarded pub discarded: Vec>>, + /// Blob transactions that became close enough to execution to announce (within 1 fee-jump) + /// but are still in the blob sub-pool. + pub blob_announced: Vec>>, } impl Default for UpdateOutcome { fn default() -> Self { - Self { promoted: vec![], discarded: vec![] } + Self { promoted: vec![], discarded: vec![], blob_announced: vec![] } } }