diff --git a/crates/net/network/src/budget.rs b/crates/net/network/src/budget.rs index 824148387b4..f1d9ca87469 100644 --- a/crates/net/network/src/budget.rs +++ b/crates/net/network/src/budget.rs @@ -35,13 +35,6 @@ pub const DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS: u32 = DEFAULT_BUD // Default is 40 pending pool imports. pub const DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS: u32 = 4 * DEFAULT_BUDGET_TRY_DRAIN_STREAM; -/// Default budget to try and stream hashes of successfully imported transactions from the pool. -/// -/// Default is naturally same as the number of transactions to attempt importing, -/// [`DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS`], so 40 pool imports. -pub const DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS: u32 = - DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS; - /// Polls the given stream. Breaks with `true` if there maybe is more work. #[macro_export] macro_rules! poll_nested_stream_with_budget { diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 9eb07e7b1a0..f4ef42523d5 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -28,8 +28,7 @@ use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_B use crate::{ budget::{ DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS, - DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS, - DEFAULT_BUDGET_TRY_DRAIN_STREAM, + DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM, }, cache::LruCache, duration_metered_exec, metered_poll_nested_stream_with_budget, @@ -77,7 +76,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; -use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, trace}; /// The future for importing transactions into the pool. @@ -339,7 +338,7 @@ pub struct TransactionsManager< /// - no nonce gaps /// - all dynamic fee requirements are (currently) met /// - account has enough balance to cover the transaction's gas - pending_transactions: ReceiverStream, + pending_transactions: mpsc::Receiver, /// Incoming events from the [`NetworkManager`](crate::NetworkManager). transaction_events: UnboundedMeteredReceiver>, /// How the `TransactionsManager` is configured. @@ -422,7 +421,7 @@ impl peers: Default::default(), command_tx, command_rx: UnboundedReceiverStream::new(command_rx), - pending_transactions: ReceiverStream::new(pending), + pending_transactions: pending, transaction_events: UnboundedMeteredReceiver::new( from_network, NETWORK_POOL_TRANSACTIONS_SCOPE, @@ -1529,14 +1528,16 @@ where // We don't expect this buffer to be large, since only pending transactions are // emitted here. let mut new_txs = Vec::new(); - let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!( - poll_durations.acc_imported_txns, - "net::tx", - "Pending transactions stream", - DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS, - this.pending_transactions.poll_next_unpin(cx), - |hash| new_txs.push(hash) - ); + let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many( + cx, + &mut new_txs, + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, + ) { + Poll::Ready(count) => { + count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE + } + Poll::Pending => false, + }; if !new_txs.is_empty() { this.on_new_pending_transactions(new_txs); }