diff --git a/Cargo.lock b/Cargo.lock index 0270043a9d..d5b58b105a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -803,7 +803,6 @@ dependencies = [ "penumbra-ibc", "penumbra-proto", "penumbra-tower-trace", - "priority-queue", "prost", "rand 0.8.5", "rand_chacha 0.3.1", @@ -5856,17 +5855,6 @@ dependencies = [ "uint", ] -[[package]] -name = "priority-queue" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "509354d8a769e8d0b567d6821b84495c60213162761a732d68ce87c964bd347f" -dependencies = [ - "autocfg", - "equivalent", - "indexmap 2.2.6", -] - [[package]] name = "proc-macro-crate" version = "1.3.1" diff --git a/crates/astria-sequencer/.gitignore b/crates/astria-sequencer/.gitignore deleted file mode 100644 index d13b5831e8..0000000000 --- a/crates/astria-sequencer/.gitignore +++ /dev/null @@ -1 +0,0 @@ -app-genesis-state.json diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index cef3ced64a..a281eb37b2 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -31,7 +31,6 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.7 cnidarium-component = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" } ibc-proto = { version = "0.41.0", features = ["server"] } matchit = "0.7.2" -priority-queue = "2.0.2" tower = "0.4" tower-abci = "0.12.0" tower-actor = "0.1.0" diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 909705851d..b2977ab3c2 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -492,15 +492,16 @@ impl App { // get copy of transactions to execute from mempool let current_account_nonce_getter = |address: [u8; 20]| self.state.get_account_nonce(address); - let mut pending_txs = self + let pending_txs = self .mempool .builder_queue(current_account_nonce_getter) .await .expect("failed to fetch pending transactions"); - while let Some((timemarked_tx, _)) = pending_txs.pop() { - let tx_hash_base64 = telemetry::display::base64(&timemarked_tx.tx_hash()).to_string(); - let tx = timemarked_tx.signed_tx(); + let mut unused_count = pending_txs.len(); + for (tx_hash, tx) in pending_txs { + unused_count = unused_count.saturating_sub(1); + let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string(); let bytes = tx.to_raw().encode_to_vec(); let tx_len = bytes.len(); info!(transaction_hash = %tx_hash_base64, "executing transaction"); @@ -545,7 +546,7 @@ impl App { } // execute tx and store in `execution_results` list on success - match self.execute_transaction(Arc::new(tx.clone())).await { + match self.execute_transaction(tx.clone()).await { Ok(events) => { execution_results.push(ExecTxResult { events, @@ -605,7 +606,7 @@ impl App { excluded_txs.saturating_add(failed_tx_count), ); - debug!("{} {}", pending_txs.len(), "leftover pending transactions"); + debug!("{unused_count} leftover pending transactions"); self.metrics .set_transactions_in_mempool_total(self.mempool.len().await); @@ -891,9 +892,7 @@ impl App { .context("failed to write sequencer block to state")?; // update the priority of any txs in the mempool based on the updated app state - update_mempool_after_finalization(&mut self.mempool, &state_tx) - .await - .context("failed to update mempool after finalization")?; + update_mempool_after_finalization(&mut self.mempool, &state_tx).await; // events that occur after end_block are ignored here; // there should be none anyways. @@ -1149,9 +1148,9 @@ impl App { async fn update_mempool_after_finalization( mempool: &mut Mempool, state: &S, -) -> anyhow::Result<()> { +) { let current_account_nonce_getter = |address: [u8; 20]| state.get_account_nonce(address); - mempool.run_maintenance(current_account_nonce_getter).await + mempool.run_maintenance(current_account_nonce_getter).await; } /// relevant data of a block being executed. diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index 621a2bf770..925912656c 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use astria_core::{ crypto::SigningKey, primitive::v1::RollupId, @@ -137,30 +139,11 @@ pub(crate) async fn initialize_app( app } -pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction { - let tx = UnsignedTransaction { - params: TransactionParams::builder() - .nonce(nonce) - .chain_id("test") - .build(), - actions: vec![ - SequenceAction { - rollup_id: RollupId::from_unhashed_bytes([0; 32]), - data: vec![0x99], - fee_asset: "astria".parse().unwrap(), - } - .into(), - ], - }; - - tx.into_signed(&get_alice_signing_key()) -} - -pub(crate) fn get_mock_tx_parameterized( +pub(crate) fn mock_tx( nonce: u32, signer: &SigningKey, - data_bytes: [u8; 32], -) -> SignedTransaction { + rollup_name: &str, +) -> Arc { let tx = UnsignedTransaction { params: TransactionParams::builder() .nonce(nonce) @@ -168,7 +151,7 @@ pub(crate) fn get_mock_tx_parameterized( .build(), actions: vec![ SequenceAction { - rollup_id: RollupId::from_unhashed_bytes(data_bytes), + rollup_id: RollupId::from_unhashed_bytes(rollup_name.as_bytes()), data: vec![0x99], fee_asset: "astria".parse().unwrap(), } @@ -176,5 +159,5 @@ pub(crate) fn get_mock_tx_parameterized( ], }; - tx.into_signed(signer) + Arc::new(tx.into_signed(signer)) } diff --git a/crates/astria-sequencer/src/app/tests_app.rs b/crates/astria-sequencer/src/app/tests_app.rs index e8ebacacb8..1f64ff45ab 100644 --- a/crates/astria-sequencer/src/app/tests_app.rs +++ b/crates/astria-sequencer/src/app/tests_app.rs @@ -449,7 +449,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() { // don't commit the result, now call prepare_proposal with the same data. // this will reset the app state. // this simulates executing the same block as a validator (specifically the proposer). - app.mempool.insert(signed_tx, 0).await.unwrap(); + app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap(); let proposer_address = [88u8; 20].to_vec().try_into().unwrap(); let prepare_proposal = PrepareProposal { @@ -474,8 +474,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() { let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address); app.mempool .run_maintenance(current_account_nonce_getter) - .await - .unwrap(); + .await; assert_eq!(app.mempool.len().await, 0); @@ -565,8 +564,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { } .into_signed(&alice); - app.mempool.insert(tx_pass, 0).await.unwrap(); - app.mempool.insert(tx_overflow, 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -589,9 +588,7 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address); app.mempool .run_maintenance(current_account_nonce_getter) - .await - .unwrap(); - + .await; // see only first tx made it in assert_eq!( result.txs.len(), @@ -645,8 +642,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { } .into_signed(&alice); - app.mempool.insert(tx_pass, 0).await.unwrap(); - app.mempool.insert(tx_overflow, 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -669,8 +666,7 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address); app.mempool .run_maintenance(current_account_nonce_getter) - .await - .unwrap(); + .await; // see only first tx made it in assert_eq!( diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index 4fcbffc1c6..47fe36cec8 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -261,12 +261,12 @@ mod test { let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); let nonce = 99; - let tx = crate::app::test_utils::get_mock_tx(nonce); + let tx = crate::app::test_utils::mock_tx(nonce, &get_alice_signing_key(), "test"); mempool.insert(tx, 0).await.unwrap(); - // insert tx into pending - let nonce = 0; - let tx = crate::app::test_utils::get_mock_tx(nonce); + // insert a tx with lower nonce also, but we should get the highest nonce + let lower_nonce = 98; + let tx = crate::app::test_utils::mock_tx(lower_nonce, &get_alice_signing_key(), "test"); mempool.insert(tx, 0).await.unwrap(); let server = Arc::new(SequencerServer::new(storage.clone(), mempool)); diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs index c1399f6216..1768f37602 100644 --- a/crates/astria-sequencer/src/mempool/benchmarks.rs +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -2,7 +2,10 @@ use std::{ collections::HashMap, - sync::OnceLock, + sync::{ + Arc, + OnceLock, + }, time::Duration, }; @@ -25,6 +28,10 @@ use astria_core::{ UnsignedTransaction, }, }; +use sha2::{ + Digest as _, + Sha256, +}; use super::{ Mempool, @@ -52,8 +59,8 @@ fn signing_keys() -> impl Iterator { } /// Returns a static ref to a collection of `MAX_INITIAL_TXS + 1` transactions. -fn transactions() -> &'static Vec { - static TXS: OnceLock> = OnceLock::new(); +fn transactions() -> &'static Vec> { + static TXS: OnceLock>> = OnceLock::new(); TXS.get_or_init(|| { let mut nonces_and_chain_ids = HashMap::new(); signing_keys() @@ -74,11 +81,12 @@ fn transactions() -> &'static Vec { data: vec![2; 1000], fee_asset: Denom::IbcPrefixed(IbcPrefixed::new([3; 32])), }; - UnsignedTransaction { + let tx = UnsignedTransaction { actions: vec![Action::Sequence(sequence_action)], params, } - .into_signed(signing_key) + .into_signed(signing_key); + Arc::new(tx) }) .take(MAX_INITIAL_TXS + 1) .collect() @@ -153,13 +161,21 @@ fn init_mempool() -> Mempool { for tx in transactions().iter().take(T::checked_size()) { mempool.insert(tx.clone(), 0).await.unwrap(); } + for i in 0..super::REMOVAL_CACHE_SIZE { + let hash = Sha256::digest(i.to_le_bytes()).into(); + mempool + .comet_bft_removal_cache + .write() + .await + .add(hash, RemovalReason::Expired); + } }); mempool } /// Returns the first transaction from the static `transactions()` not included in the initialized /// mempool, i.e. the one at index `T::size()`. -fn get_unused_tx() -> SignedTransaction { +fn get_unused_tx() -> Arc { transactions().get(T::checked_size()).unwrap().clone() } @@ -190,8 +206,7 @@ fn insert(bencher: divan::Bencher) { /// Benchmarks `Mempool::builder_queue` on a mempool with the given number of existing entries. /// -/// Note: this benchmark doesn't capture the nuances of dealing with parked vs pending -/// transactions. +/// Note: this benchmark doesn't capture the nuances of dealing with parked vs pending transactions. #[divan::bench( max_time = MAX_TIME, types = [ @@ -222,8 +237,8 @@ fn builder_queue(bencher: divan::Bencher) { /// Benchmarks `Mempool::remove_tx_invalid` for a single transaction on a mempool with the given /// number of existing entries. /// -/// Note about this benchmark: `remove_tx_invalid()` will removes all higher nonces. To keep this -/// benchmark comparible with the previous mempool, we're removing the highest nonce. In the future +/// Note about this benchmark: `remove_tx_invalid()` will remove all higher nonces. To keep this +/// benchmark comparable with the previous mempool, we're removing the highest nonce. In the future /// it would be better to have this bench remove the midpoint. #[divan::bench( max_time = MAX_TIME, @@ -243,6 +258,7 @@ fn remove_tx_invalid(bencher: divan::Bencher) { .with_inputs(|| { let signed_tx = transactions() .get(T::checked_size().saturating_sub(1)) + .cloned() .unwrap(); (init_mempool::(), signed_tx) }) @@ -255,6 +271,28 @@ fn remove_tx_invalid(bencher: divan::Bencher) { }); } +/// Benchmarks `Mempool::check_removed_comet_bft` for a single transaction on a mempool with the +/// `comet_bft_removal_cache` filled. +/// +/// Note that the number of entries in the main cache is irrelevant here. +#[divan::bench(max_time = MAX_TIME)] +fn check_removed_comet_bft(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| { + let tx_hash = Sha256::digest(0_usize.to_le_bytes()).into(); + (init_mempool::(), tx_hash) + }) + .bench_values(move |(mempool, tx_hash)| { + runtime.block_on(async { + mempool.check_removed_comet_bft(tx_hash).await.unwrap(); + }); + }); +} + /// Benchmarks `Mempool::run_maintenance` on a mempool with the given number of existing entries. #[divan::bench( max_time = MAX_TIME, @@ -283,10 +321,7 @@ fn run_maintenance(bencher: divan::Bencher) { .with_inputs(|| init_mempool::()) .bench_values(move |mempool| { runtime.block_on(async { - mempool - .run_maintenance(current_account_nonce_getter) - .await - .unwrap(); + mempool.run_maintenance(current_account_nonce_getter).await; }); }); } diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index 5e76ad0207..1106d74bef 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -4,7 +4,6 @@ mod transactions_container; use std::{ collections::{ HashMap, - HashSet, VecDeque, }, future::Future, @@ -14,18 +13,25 @@ use std::{ use astria_core::protocol::transaction::v1alpha1::SignedTransaction; use tokio::{ - sync::RwLock, + join, + sync::{ + RwLock, + RwLockWriteGuard, + }, time::Duration, }; -use tracing::instrument; +use tracing::{ + error, + instrument, +}; +pub(crate) use transactions_container::InsertionError; use transactions_container::{ - BuilderQueue, - InsertionError, + ParkedTransactions, + PendingTransactions, TimemarkedTransaction, - TransactionContainer, }; -#[derive(Debug, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub(crate) enum RemovalReason { Expired, NonceStale, @@ -34,10 +40,13 @@ pub(crate) enum RemovalReason { FailedCheckTx(String), } -const TX_TTL: Duration = Duration::from_secs(240); // How long transactions are considered valid in the mempool. -const PARKED_SIZE_LIMIT: usize = 15; // Max number of parked transactions allowed per account. -const PENDING_SIZE_LIMIT: usize = 0; // Placeholder, is not enforced. -const REMOVAL_CACHE_SIZE: usize = 4096; // Max number of transactions to keep in the removal cache. Should be larger than the max number of transactions allowed in the cometBFT mempool. +/// How long transactions are considered valid in the mempool. +const TX_TTL: Duration = Duration::from_secs(240); +/// Max number of parked transactions allowed per account. +const MAX_PARKED_TXS_PER_ACCOUNT: usize = 15; +/// Max number of transactions to keep in the removal cache. Should be larger than the max number of +/// transactions allowed in the cometBFT mempool. +const REMOVAL_CACHE_SIZE: usize = 4096; /// `RemovalCache` is used to signal to `CometBFT` that a /// transaction can be removed from the `CometBFT` mempool. @@ -87,39 +96,34 @@ impl RemovalCache { } } -/// [`Mempool`] is an account-based structure for maintaining transactions -/// for execution. +/// [`Mempool`] is an account-based structure for maintaining transactions for execution. /// -/// The transactions are split between pending and parked, where pending -/// transactions are ready for execution and parked transactions could be -/// executable in the future. +/// The transactions are split between pending and parked, where pending transactions are ready for +/// execution and parked transactions could be executable in the future. /// -/// The mempool exposes the pending transactions through `builder_queue()`, -/// which returns all pending transactions sorted by the difference between the -/// transaction nonce and the current account nonce, and then by time first -/// seen. These transactions are returned as a copy. +/// The mempool exposes the pending transactions through `builder_queue()`, which returns a copy of +/// all pending transactions sorted in the order in which they should be executed. The sort order +/// is firstly by the difference between the transaction nonce and the account's current nonce +/// (ascending), and then by time first seen (ascending). /// /// The mempool implements the following policies: /// 1. Nonce replacement is not allowed. -/// 2. Accounts cannot have more than `PARKED_SIZE_LIMIT` transactions in their parked queues. +/// 2. Accounts cannot have more than `MAX_PARKED_TXS_PER_ACCOUNT` transactions in their parked +/// queues. /// 3. There is no account limit on pending transactions. /// 4. Transactions will expire and can be removed after `TX_TTL` time. /// 5. If an account has a transaction removed for being invalid or expired, all transactions for -/// that account with a higher nonce can be removed as well. This is due to the fact that we do +/// that account with a higher nonce will be removed as well. This is due to the fact that we do /// not execute failing transactions, so a transaction 'failing' will mean that further account /// nonces will not be able to execute either. /// /// Future extensions to this mempool can include: /// - maximum mempool size /// - account balance aware pending queue -/// -/// Note: when grabbing locks to hold, grab them in order of: all, pending, parked. This -/// is just a convention to prevent deadlocks. #[derive(Clone)] pub(crate) struct Mempool { - all: Arc>>, - pending: Arc>, - parked: Arc>, + pending: Arc>, + parked: Arc>>, comet_bft_removal_cache: Arc>, } @@ -127,17 +131,8 @@ impl Mempool { #[must_use] pub(crate) fn new() -> Self { Self { - all: Arc::new(RwLock::new(HashSet::<[u8; 32]>::new())), - pending: Arc::new(RwLock::new(TransactionContainer::new( - true, - PENDING_SIZE_LIMIT, - TX_TTL, - ))), - parked: Arc::new(RwLock::new(TransactionContainer::new( - false, - PARKED_SIZE_LIMIT, - TX_TTL, - ))), + pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))), + parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))), comet_bft_removal_cache: Arc::new(RwLock::new(RemovalCache::new( NonZeroUsize::try_from(REMOVAL_CACHE_SIZE) .expect("Removal cache cannot be zero sized"), @@ -149,7 +144,12 @@ impl Mempool { #[must_use] #[instrument(skip_all)] pub(crate) async fn len(&self) -> usize { - self.all.read().await.len() + #[rustfmt::skip] + let (pending_len, parked_len) = join!( + async { self.pending.read().await.len() }, + async { self.parked.read().await.len() } + ); + pending_len.saturating_add(parked_len) } /// Inserts a transaction into the mempool and does not allow for transaction replacement. @@ -157,72 +157,58 @@ impl Mempool { #[instrument(skip_all)] pub(crate) async fn insert( &self, - tx: SignedTransaction, + tx: Arc, current_account_nonce: u32, ) -> anyhow::Result<(), InsertionError> { - let timemarked_tx = Arc::new(TimemarkedTransaction::new(tx)); - - // check if already tracked - if self.all.read().await.contains(&timemarked_tx.tx_hash()) { - return Err(InsertionError::AlreadyPresent); - } + let timemarked_tx = TimemarkedTransaction::new(tx); - // grab needed locks in normal order - let mut all = self.all.write().await; - let mut pending = self.pending.write().await; - let mut parked = self.parked.write().await; + let (mut pending, mut parked) = self.acquire_both_locks().await; // try insert into pending (will fail if nonce is gapped or already present) - let mut success = pending.add(timemarked_tx.clone(), current_account_nonce); - - match success { + match pending.add(timemarked_tx.clone(), current_account_nonce) { Err(InsertionError::NonceGap) => { + // Release the lock asap. + drop(pending); // try to add to parked queue - success = parked.add(timemarked_tx.clone(), current_account_nonce); + parked.add(timemarked_tx, current_account_nonce) } - Err( + error @ Err( InsertionError::AlreadyPresent | InsertionError::NonceTooLow | InsertionError::NonceTaken | InsertionError::AccountSizeLimit, - ) => { - // noop - } + ) => error, Ok(()) => { // check parked for txs able to be promoted let to_promote = parked.pop_front_account( timemarked_tx.address(), timemarked_tx - .signed_tx() .nonce() .checked_add(1) .expect("failed to increment nonce in promotion"), ); - for tx in to_promote { - assert!( - pending.add(tx, current_account_nonce).is_ok(), - "promotion should work" - ); + // Release the lock asap. + drop(parked); + for ttx in to_promote { + if let Err(error) = pending.add(ttx, current_account_nonce) { + error!( + current_account_nonce, + "failed to promote transaction during insertion: {error:#}" + ); + } } + Ok(()) } } - - if success.is_ok() { - // track in all list if successfully added - all.insert(timemarked_tx.tx_hash()); - } - - success } - /// Returns a copy of all transactions ready for execution, sorted - /// first by the difference between a transaction and the account's - /// nonce and then by the time that the transaction was first - /// seen by the appside mempool. + /// Returns a copy of all transactions and their hashes ready for execution, sorted first by the + /// difference between a transaction and the account's current nonce and then by the time that + /// the transaction was first seen by the appside mempool. pub(crate) async fn builder_queue( &self, current_account_nonce_getter: F, - ) -> anyhow::Result + ) -> anyhow::Result)>> where F: Fn([u8; 20]) -> O, O: Future>, @@ -234,52 +220,43 @@ impl Mempool { .await } - /// Removes the target transaction and all transactions for associated account - /// with higher nonces. + /// Removes the target transaction and all transactions for associated account with higher + /// nonces. /// - /// This function should only be used to remove invalid/failing transactions and - /// not executed transactions. Executed transactions will be removed in the `run_maintenance()` - /// function. + /// This function should only be used to remove invalid/failing transactions and not executed + /// transactions. Executed transactions will be removed in the `run_maintenance()` function. pub(crate) async fn remove_tx_invalid( &self, - signed_tx: &SignedTransaction, + signed_tx: Arc, reason: RemovalReason, - ) -> Vec> { - let ttx: Arc = - Arc::new(TimemarkedTransaction::new(signed_tx.clone())); - let mut removed_txs = Vec::>::new(); - - if !self.all.read().await.contains(&ttx.tx_hash()) { - return removed_txs; - } + ) { + let tx_hash = signed_tx.sha256_of_proto_encoding(); + let address = signed_tx.verification_key().address_bytes(); + + // Try to remove from pending. + let removed_txs = match self.pending.write().await.remove(signed_tx) { + Ok(mut removed_txs) => { + // Remove all of parked. + removed_txs.append(&mut self.parked.write().await.clear_account(&address)); + removed_txs + } + Err(signed_tx) => { + // Not found in pending, try to remove from parked and if not found, just return. + match self.parked.write().await.remove(signed_tx) { + Ok(removed_txs) => removed_txs, + Err(_) => return, + } + } + }; - // grab needed main locks in normal order - let mut all = self.all.write().await; - let mut pending = self.pending.write().await; - let mut parked = self.parked.write().await; + // Add all removed to removal cache for cometbft. let mut removal_cache = self.comet_bft_removal_cache.write().await; - - // mark as invalid in removal cache - removal_cache.add(ttx.tx_hash(), reason); - - // try remove from pending - removed_txs.append(&mut pending.remove(&ttx, true)); - if removed_txs.is_empty() { - // try remove transaction from parked - removed_txs.append(&mut parked.remove(&ttx, true)); - } else { - // remove all of parked - removed_txs.append(&mut parked.clear_account(ttx.address())); - } - assert!(!removed_txs.is_empty(), "error in remove_tx_invalid logic"); // TODO: is it ok to keep these in? - - // remove all from tracked and add to removal cache for cometbft - for tx in &removed_txs { - all.remove(&tx.tx_hash()); - removal_cache.add(tx.tx_hash(), RemovalReason::LowerNonceInvalidated); + // Add the original tx first, since it will also be listed in `removed_txs`. The second + // attempt to add it inside the loop below will be a no-op. + removal_cache.add(tx_hash, reason); + for removed_tx in removed_txs { + removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } - - removed_txs } /// Checks if a transaction was flagged to be removed from the `CometBFT` mempool. Will @@ -295,54 +272,35 @@ impl Mempool { /// All removed transactions are added to the CometBFT removal cache to aid with CometBFT /// mempool maintenance. #[instrument(skip_all)] - pub(crate) async fn run_maintenance( - &self, - current_account_nonce_getter: F, - ) -> anyhow::Result<()> + pub(crate) async fn run_maintenance(&self, current_account_nonce_getter: F) where F: Fn([u8; 20]) -> O, O: Future>, { - // grab needed main locks in normal order - let mut all = self.all.write().await; - let mut pending = self.pending.write().await; - let mut parked = self.parked.write().await; - - // clean accounts of stale and expired tranasctions - let mut removed_txs = pending - .clean_accounts(¤t_account_nonce_getter) - .await - .expect("failed to clean pending"); - removed_txs.append( - &mut parked - .clean_accounts(¤t_account_nonce_getter) - .await - .expect("failed to clean pending"), - ); + let (mut pending, mut parked) = self.acquire_both_locks().await; + + // clean accounts of stale and expired transactions + let mut removed_txs = pending.clean_accounts(¤t_account_nonce_getter).await; + removed_txs.append(&mut parked.clean_accounts(¤t_account_nonce_getter).await); // run promotion logic in case transactions not in this mempool advanced account state - let promotable_txs = parked - .find_promotables(¤t_account_nonce_getter) - .await - .expect("should work?"); - for ttx in promotable_txs { - let current_account_nonce = current_account_nonce_getter(*ttx.address()) - .await - .expect("failed to get account nonce for promotions"); - assert!( - pending.add(ttx, current_account_nonce).is_ok(), - "promotions should work" - ); + let to_promote = parked.find_promotables(¤t_account_nonce_getter).await; + // Release the lock asap. + drop(parked); + for (ttx, current_account_nonce) in to_promote { + if let Err(error) = pending.add(ttx, current_account_nonce) { + error!( + current_account_nonce, + "failed to promote transaction during maintenance: {error:#}" + ); + } } - // remove from tracked and add to removal cache for cometbft + // add to removal cache for cometbft let mut removal_cache = self.comet_bft_removal_cache.write().await; - for (tx, reason) in removed_txs { - all.remove(&tx.tx_hash()); - removal_cache.add(tx.tx_hash(), reason); + for (tx_hash, reason) in removed_txs { + removal_cache.add(tx_hash, reason); } - - Ok(()) } /// Returns the highest pending nonce for the given address if it exists in the mempool. Note: @@ -353,6 +311,17 @@ impl Mempool { pub(crate) async fn pending_nonce(&self, address: [u8; 20]) -> Option { self.pending.read().await.pending_nonce(address) } + + async fn acquire_both_locks( + &self, + ) -> ( + RwLockWriteGuard, + RwLockWriteGuard>, + ) { + let pending = self.pending.write().await; + let parked = self.parked.write().await; + (pending, parked) + } } #[cfg(test)] @@ -360,7 +329,7 @@ mod test { use astria_core::crypto::SigningKey; use super::*; - use crate::app::test_utils::get_mock_tx_parameterized; + use crate::app::test_utils::mock_tx; #[tokio::test] async fn insert() { @@ -368,38 +337,35 @@ mod test { let signing_key = SigningKey::from([1; 32]); // sign and insert nonce 1 - let tx1 = get_mock_tx_parameterized(1, &signing_key, [0; 32]); + let tx1 = mock_tx(1, &signing_key, "test"); assert!( mempool.insert(tx1.clone(), 0).await.is_ok(), "should be able to insert nonce 1 transaction into mempool" ); // try to insert again - assert!( - matches!( - mempool.insert(tx1.clone(), 0).await, - Err(InsertionError::AlreadyPresent) - ), + assert_eq!( + mempool.insert(tx1.clone(), 0).await.unwrap_err(), + InsertionError::AlreadyPresent, "already present" ); // try to replace nonce - let tx1_replacement = get_mock_tx_parameterized(1, &signing_key, [1; 32]); - assert!( - matches!( - mempool.insert(tx1_replacement.clone(), 0).await, - Err(InsertionError::NonceTaken) - ), + let tx1_replacement = mock_tx(1, &signing_key, "test_0"); + assert_eq!( + mempool + .insert(tx1_replacement.clone(), 0) + .await + .unwrap_err(), + InsertionError::NonceTaken, "nonce replace not allowed" ); // add too low nonce - let tx0 = get_mock_tx_parameterized(0, &signing_key, [1; 32]); - assert!( - matches!( - mempool.insert(tx0.clone(), 1).await, - Err(InsertionError::NonceTooLow) - ), + let tx0 = mock_tx(0, &signing_key, "test"); + assert_eq!( + mempool.insert(tx0.clone(), 1).await.unwrap_err(), + InsertionError::NonceTooLow, "nonce too low" ); } @@ -418,28 +384,28 @@ mod test { // add nonces in odd order to trigger insertion promotion logic // sign and insert nonce 1 - let tx1 = get_mock_tx_parameterized(1, &signing_key, [0; 32]); + let tx1 = mock_tx(1, &signing_key, "test"); assert!( mempool.insert(tx1.clone(), 0).await.is_ok(), "should be able to insert nonce 1 transaction into mempool" ); // sign and insert nonce 2 - let tx2 = get_mock_tx_parameterized(2, &signing_key, [0; 32]); + let tx2 = mock_tx(2, &signing_key, "test"); assert!( mempool.insert(tx2.clone(), 0).await.is_ok(), "should be able to insert nonce 2 transaction into mempool" ); // sign and insert nonce 0 - let tx0 = get_mock_tx_parameterized(0, &signing_key, [0; 32]); + let tx0 = mock_tx(0, &signing_key, "test"); assert!( mempool.insert(tx0.clone(), 0).await.is_ok(), "should be able to insert nonce 0 transaction into mempool" ); // sign and insert nonce 4 - let tx4 = get_mock_tx_parameterized(4, &signing_key, [0; 32]); + let tx4 = mock_tx(4, &signing_key, "test"); assert!( mempool.insert(tx4.clone(), 0).await.is_ok(), "should be able to insert nonce 4 transaction into mempool" @@ -458,21 +424,14 @@ mod test { // grab building queue, should return transactions [1,2] since [0] was below and [4] is // gapped - let mut builder_queue = mempool + let builder_queue = mempool .builder_queue(current_account_nonce_getter) .await .expect("failed to get builder queue"); // see contains first two transactions that should be pending - let (returned_tx, _) = builder_queue - .pop() - .expect("should return lowest nonced transaction"); - assert_eq!(returned_tx.signed_tx().nonce(), 1, "nonce should be one"); - - let (returned_tx, _) = builder_queue - .pop() - .expect("should return other transaction"); - assert_eq!(returned_tx.signed_tx().nonce(), 2, "nonce should be two"); + assert_eq!(builder_queue[0].1.nonce(), 1, "nonce should be one"); + assert_eq!(builder_queue[1].1.nonce(), 2, "nonce should be two"); // see mempool's transactions just cloned, not consumed assert_eq!(mempool.len().await, 4); @@ -485,10 +444,7 @@ mod test { } Err(anyhow::anyhow!("invalid address")) }; - mempool - .run_maintenance(current_account_nonce_getter) - .await - .expect("failed to run maintenance"); + mempool.run_maintenance(current_account_nonce_getter).await; // assert mempool at 1 assert_eq!(mempool.len().await, 1); @@ -498,39 +454,37 @@ mod test { .builder_queue(current_account_nonce_getter) .await .expect("failed to get builder queue"); - let (returned_tx, _) = builder_queue.pop().expect("should return last transaction"); - assert_eq!(returned_tx.signed_tx().nonce(), 4, "nonce should be four"); + let (_, returned_tx) = builder_queue.pop().expect("should return last transaction"); + assert_eq!(returned_tx.nonce(), 4, "nonce should be four"); } #[tokio::test] - #[allow(unused_variables)] // for matches! macro - #[allow(clippy::too_many_lines)] async fn remove_invalid() { let mempool = Mempool::new(); let signing_key = SigningKey::from([1; 32]); // sign and insert nonces 0,1 and 3,4,5 - let tx0 = get_mock_tx_parameterized(0, &signing_key, [0; 32]); + let tx0 = mock_tx(0, &signing_key, "test"); assert!( mempool.insert(tx0.clone(), 0).await.is_ok(), "should be able to insert nonce 0 transaction into mempool" ); - let tx1 = get_mock_tx_parameterized(1, &signing_key, [0; 32]); + let tx1 = mock_tx(1, &signing_key, "test"); assert!( mempool.insert(tx1.clone(), 0).await.is_ok(), "should be able to insert nonce 1 transaction into mempool" ); - let tx3 = get_mock_tx_parameterized(3, &signing_key, [0; 32]); + let tx3 = mock_tx(3, &signing_key, "test"); assert!( mempool.insert(tx3.clone(), 0).await.is_ok(), "should be able to insert nonce 3 transaction into mempool" ); - let tx4 = get_mock_tx_parameterized(4, &signing_key, [0; 32]); + let tx4 = mock_tx(4, &signing_key, "test"); assert!( mempool.insert(tx4.clone(), 0).await.is_ok(), "should be able to insert nonce 4 transaction into mempool" ); - let tx5 = get_mock_tx_parameterized(5, &signing_key, [0; 32]); + let tx5 = mock_tx(5, &signing_key, "test"); assert!( mempool.insert(tx5.clone(), 0).await.is_ok(), "should be able to insert nonce 5 transaction into mempool" @@ -540,73 +494,30 @@ mod test { let removal_reason = RemovalReason::FailedPrepareProposal("reason".to_string()); // remove 4, should remove 4 and 5 - let mut removed_txs = mempool - .remove_tx_invalid(&tx4, removal_reason.clone()) + mempool + .remove_tx_invalid(tx4.clone(), removal_reason.clone()) .await; - - assert_eq!( - removed_txs - .pop() - .expect("should return transaction") - .signed_tx() - .nonce(), - 4 - ); - assert_eq!( - removed_txs - .pop() - .expect("should return transaction") - .signed_tx() - .nonce(), - 5 - ); assert_eq!(mempool.len().await, 3); // remove 4 again is also ok - let removed_txs = mempool + mempool .remove_tx_invalid( - &tx4, + tx4.clone(), RemovalReason::NonceStale, // shouldn't be inserted into removal cache ) .await; - assert_eq!(removed_txs.len(), 0); assert_eq!(mempool.len().await, 3); // remove 1, should remove 1 and 3 - let mut removed_txs = mempool - .remove_tx_invalid(&tx1, removal_reason.clone()) + mempool + .remove_tx_invalid(tx1.clone(), removal_reason.clone()) .await; - - assert_eq!( - removed_txs - .pop() - .expect("should return transaction") - .signed_tx() - .nonce(), - 3 - ); - assert_eq!( - removed_txs - .pop() - .expect("should return transaction") - .signed_tx() - .nonce(), - 1 - ); assert_eq!(mempool.len().await, 1); // remove 0 - let mut removed_txs = mempool - .remove_tx_invalid(&tx0, removal_reason.clone()) + mempool + .remove_tx_invalid(tx0.clone(), removal_reason.clone()) .await; - assert_eq!( - removed_txs - .pop() - .expect("should return transaction") - .signed_tx() - .nonce(), - 0 - ); assert_eq!(mempool.len().await, 0); // assert that all were added to the cometbft removal cache @@ -615,13 +526,13 @@ mod test { mempool .check_removed_comet_bft(tx0.sha256_of_proto_encoding()) .await, - removal_reason + Some(RemovalReason::FailedPrepareProposal(_)) )); assert!(matches!( mempool .check_removed_comet_bft(tx1.sha256_of_proto_encoding()) .await, - removal_reason + Some(RemovalReason::FailedPrepareProposal(_)) )); assert!(matches!( mempool @@ -633,7 +544,7 @@ mod test { mempool .check_removed_comet_bft(tx4.sha256_of_proto_encoding()) .await, - removal_reason + Some(RemovalReason::FailedPrepareProposal(_)) )); assert!(matches!( mempool @@ -654,24 +565,24 @@ mod test { let signing_address_2 = signing_key_2.verification_key().address_bytes(); // sign and insert nonces 0,1 - let tx0 = get_mock_tx_parameterized(0, &signing_key_0, [0; 32]); + let tx0 = mock_tx(0, &signing_key_0, "test"); assert!( mempool.insert(tx0.clone(), 0).await.is_ok(), "should be able to insert nonce 0 transaction into mempool" ); - let tx1 = get_mock_tx_parameterized(1, &signing_key_0, [0; 32]); + let tx1 = mock_tx(1, &signing_key_0, "test"); assert!( mempool.insert(tx1.clone(), 0).await.is_ok(), "should be able to insert nonce 1 transaction into mempool" ); // sign and insert nonces 100, 101 - let tx100 = get_mock_tx_parameterized(100, &signing_key_1, [0; 32]); + let tx100 = mock_tx(100, &signing_key_1, "test"); assert!( mempool.insert(tx100.clone(), 100).await.is_ok(), "should be able to insert nonce 100 transaction into mempool" ); - let tx101 = get_mock_tx_parameterized(101, &signing_key_1, [0; 32]); + let tx101 = mock_tx(101, &signing_key_1, "test"); assert!( mempool.insert(tx101.clone(), 100).await.is_ok(), "should be able to insert nonce 101 transaction into mempool" diff --git a/crates/astria-sequencer/src/mempool/transactions_container.rs b/crates/astria-sequencer/src/mempool/transactions_container.rs index da2320cb25..855140bf9d 100644 --- a/crates/astria-sequencer/src/mempool/transactions_container.rs +++ b/crates/astria-sequencer/src/mempool/transactions_container.rs @@ -1,40 +1,42 @@ use std::{ cmp::Ordering, collections::{ + hash_map, BTreeMap, HashMap, - HashSet, }, fmt, future::Future, + mem, sync::Arc, }; use anyhow::Context; use astria_core::protocol::transaction::v1alpha1::SignedTransaction; -use priority_queue::PriorityQueue; use tokio::time::{ Duration, Instant, }; +use tracing::error; use super::RemovalReason; -/// [`TimemarkedTransaction`] is a wrapper around a signed transaction -/// used to keep track of when that transaction was first seen in the mempool. -/// -/// Note: `PartialEq` was implemented for this struct to only take into account -/// the signed transaction's hash. -#[derive(Debug)] -pub(crate) struct TimemarkedTransaction { - signed_tx: SignedTransaction, +pub(super) type PendingTransactions = TransactionsContainer; +pub(super) type ParkedTransactions = + TransactionsContainer>; + +/// `TimemarkedTransaction` is a wrapper around a signed transaction used to keep track of when that +/// transaction was first seen in the mempool. +#[derive(Clone, Debug)] +pub(super) struct TimemarkedTransaction { + signed_tx: Arc, tx_hash: [u8; 32], time_first_seen: Instant, address: [u8; 20], } impl TimemarkedTransaction { - pub(crate) fn new(signed_tx: SignedTransaction) -> Self { + pub(super) fn new(signed_tx: Arc) -> Self { Self { tx_hash: signed_tx.sha256_of_proto_encoding(), address: signed_tx.verification_key().address_bytes(), @@ -57,41 +59,35 @@ impl TimemarkedTransaction { }) } - pub(crate) fn time_first_seen(&self) -> Instant { - self.time_first_seen - } - - pub(crate) fn signed_tx(&self) -> &SignedTransaction { - &self.signed_tx + fn is_expired(&self, now: Instant, ttl: Duration) -> bool { + now.saturating_duration_since(self.time_first_seen) > ttl } - pub(crate) fn tx_hash(&self) -> [u8; 32] { - self.tx_hash + pub(super) fn nonce(&self) -> u32 { + self.signed_tx.nonce() } - pub(crate) fn address(&self) -> &[u8; 20] { + pub(super) fn address(&self) -> &[u8; 20] { &self.address } } -/// Only consider `self.tx_hash` for equality. This is consistent with the impl for std `Hash`. -impl PartialEq for TimemarkedTransaction { - fn eq(&self, other: &Self) -> bool { - self.tx_hash == other.tx_hash - } -} - -impl Eq for TimemarkedTransaction {} - -/// Only consider `self.tx_hash` when hashing. This is consistent with the impl for equality. -impl std::hash::Hash for TimemarkedTransaction { - fn hash(&self, state: &mut H) { - self.tx_hash.hash(state); +impl fmt::Display for TimemarkedTransaction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "tx_hash: {}, address: {}, signer: {}, nonce: {}, chain ID: {}", + telemetry::display::base64(&self.tx_hash), + telemetry::display::base64(&self.address), + self.signed_tx.verification_key(), + self.signed_tx.nonce(), + self.signed_tx.chain_id(), + ) } } -#[derive(Clone, Debug)] -pub(crate) struct TransactionPriority { +#[derive(Clone, Copy, Debug)] +struct TransactionPriority { nonce_diff: u32, time_first_seen: Instant, } @@ -125,24 +121,7 @@ impl PartialOrd for TransactionPriority { } } -/// [`BuilderQueue`] is a typed used to order transactions by the difference between a transaction -/// nonce and the account nonce then by the time that a transaction was first seen by the mempool. -pub(crate) type BuilderQueue = PriorityQueue, TransactionPriority>; - -/// [`AccountTransactionContainer`] is a container used for managing transactions belonging -/// to a single account. -/// -/// The `strict` mode defines if transaction nonces are allowed to be gapped or not. The -/// `size_limit` is only enfored when the `strict` mode is false. -#[derive(Clone, Debug)] -struct AccountTransactionContainer { - txs: BTreeMap>, // tracked transactions - strict: bool, // if nonce gaps are allowed or not - size_limit: usize, /* max number of transactions stored, only - * enforced if not strict */ -} - -#[derive(Debug, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub(crate) enum InsertionError { AlreadyPresent, NonceTooLow, @@ -154,533 +133,530 @@ pub(crate) enum InsertionError { impl fmt::Display for InsertionError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - InsertionError::AlreadyPresent => write!(f, "AlreadyPresent"), - InsertionError::NonceTooLow => write!(f, "NonceTooLow"), - InsertionError::NonceTaken => write!(f, "NonceTaken"), - InsertionError::NonceGap => write!(f, "NonceGap"), - InsertionError::AccountSizeLimit => write!(f, "AccountSizeLimit"), + InsertionError::AlreadyPresent => { + write!(f, "transaction already exists in the mempool") + } + InsertionError::NonceTooLow => { + write!(f, "given nonce has already been used previously") + } + InsertionError::NonceTaken => write!(f, "given nonce already exists in the mempool"), + InsertionError::NonceGap => write!(f, "gap in the pending nonce sequence"), + InsertionError::AccountSizeLimit => write!( + f, + "maximum number of pending transactions has been reached for the given account" + ), } } } -impl AccountTransactionContainer { - fn new(strict: bool, size_limit: usize) -> Self { - AccountTransactionContainer { - txs: BTreeMap::>::new(), - strict, - size_limit, - } +/// Transactions for a single account where the sequence of nonces must not have any gaps. +#[derive(Clone, Default, Debug)] +pub(super) struct PendingTransactionsForAccount { + txs: BTreeMap, +} + +impl PendingTransactionsForAccount { + fn highest_nonce(&self) -> Option { + self.txs.last_key_value().map(|(nonce, _)| *nonce) } +} - /// Adds transaction to the container. Note: does NOT allow for nonce replacement. - /// Will fail if `strict` is true and adding the transaction would create a nonce gap. - /// - /// `current_account_nonce` should be the nonce that the current accounts state is at. - /// Note: if the account `current_account_nonce` ever decreases, this is a logic error - /// and could mess up the validity of strict containers. - fn add( - &mut self, - ttx: Arc, - current_account_nonce: u32, - ) -> anyhow::Result<(), InsertionError> { - if !self.strict && self.txs.len() >= self.size_limit { - return Err(InsertionError::AccountSizeLimit); - } +impl TransactionsForAccount for PendingTransactionsForAccount { + fn txs(&self) -> &BTreeMap { + &self.txs + } - if ttx.signed_tx().nonce() < current_account_nonce { - return Err(InsertionError::NonceTooLow); - } + fn txs_mut(&mut self) -> &mut BTreeMap { + &mut self.txs + } + + fn is_at_tx_limit(&self) -> bool { + false + } + + fn is_sequential_nonce_precondition_met(&self, ttx: &TimemarkedTransaction) -> bool { + // If the `ttx` nonce is 0, precondition is met iff there are no other existing txs. + let Some(previous_nonce) = ttx.signed_tx.nonce().checked_sub(1) else { + return self.txs().is_empty(); + }; + + // Precondition is met if the previous nonce is in the existing txs, or if there are no + // existing txs. + self.txs().contains_key(&previous_nonce) || self.txs().is_empty() + } +} + +/// Transactions for a single account where gaps are allowed in the sequence of nonces, and with an +/// upper bound on the number of transactions. +#[derive(Clone, Default, Debug)] +pub(super) struct ParkedTransactionsForAccount { + txs: BTreeMap, +} - if self.txs.contains_key(&ttx.signed_tx.nonce()) { - return Err(InsertionError::NonceTaken); +impl ParkedTransactionsForAccount { + /// Returns contiguous transactions from front of queue starting from target nonce, removing the + /// transactions in the process. + /// + /// Note: this function only operates on the front of the queue. If the target nonce is not at + /// the front, an error will be logged and nothing will be returned. + fn pop_front_contiguous( + &mut self, + mut target_nonce: u32, + ) -> impl Iterator { + let mut split_at = 0; + for nonce in self.txs.keys() { + if *nonce == target_nonce { + let Some(next_target) = target_nonce.checked_add(1) else { + // We've got contiguous nonces up to `u32::MAX`; return everything. + return mem::take(&mut self.txs).into_values(); + }; + target_nonce = next_target; + split_at = next_target; + } else { + break; + } } - // ensure that if strict mode is on that the previous nonce is already in the list - if self.strict - && ttx.signed_tx.nonce() != 0 - && ttx.signed_tx.nonce() != current_account_nonce - && !self.txs.contains_key( - &ttx.signed_tx - .nonce() - .checked_sub(1) - .expect("Error subtracting from non zero nonce"), - ) - { - // nonce is gapped - return Err(InsertionError::NonceGap); + if split_at == 0 { + error!(target_nonce, "expected nonce to be present"); } - // add to internal structure - self.txs.insert(ttx.signed_tx.nonce(), ttx); + let mut split_off = self.txs.split_off(&split_at); + // The higher nonces are returned in `split_off`, but we want to keep these in `self.txs`, + // so swap the two collections. + mem::swap(&mut split_off, &mut self.txs); + split_off.into_values() + } +} - Ok(()) +impl TransactionsForAccount + for ParkedTransactionsForAccount +{ + fn txs(&self) -> &BTreeMap { + &self.txs } - /// Removes transaction from the container. If `remove_higher` is true, will - /// remove all higher nonces from account as well. Returns a vector of removed transactions. - fn remove(&mut self, nonce: u32, remove_higher: bool) -> Vec> { - let mut result = Vec::>::new(); - if !self.txs.contains_key(&nonce) { - // doesn't contain requested nonce - return result; - } + fn txs_mut(&mut self) -> &mut BTreeMap { + &mut self.txs + } - if remove_higher { - // remove from end till all higher nonces removed - loop { - let (key, value) = self - .txs - .pop_last() - .expect("Error popping values that should exist"); - result.push(value); - if key == nonce { - break; - } - } - } else { - // remove single element - result.push( - self.txs - .remove(&nonce) - .expect("Error removing value that should exist"), - ); - } - result + fn is_at_tx_limit(&self) -> bool { + self.txs.len() >= MAX_TX_COUNT } - /// Returns a copy of all of the contained transactions. - fn all_transactions_copy(&self) -> Vec> { - self.txs.clone().into_values().collect() + fn is_sequential_nonce_precondition_met(&self, _: &TimemarkedTransaction) -> bool { + true } +} - /// Returns all transactions, consuming them in the process. - fn pop_all_transactions(&mut self) -> Vec> { - let mut popped_txs = Vec::>::new(); - while let Some((_, tx)) = self.txs.pop_first() { - popped_txs.push(tx); - } - popped_txs +/// `TransactionsForAccount` is a trait for a collection of transactions belonging to a single +/// account. +pub(super) trait TransactionsForAccount: Default { + fn new() -> Self + where + Self: Sized + Default, + { + Self::default() } - /// Returns contiguous transactions from front of queue starting from target nonce, removing the - /// transactions in the process. Used when need to promote transactions from parked to - /// pending. + fn txs(&self) -> &BTreeMap; + + fn txs_mut(&mut self) -> &mut BTreeMap; + + fn is_at_tx_limit(&self) -> bool; + + /// Returns `Ok` if adding `ttx` would not break the nonce precondition, i.e. sequential + /// nonces with no gaps if in `SequentialNonces` mode. + fn is_sequential_nonce_precondition_met(&self, ttx: &TimemarkedTransaction) -> bool; + + /// Adds transaction to the container. Note: does NOT allow for nonce replacement. + /// Will fail if in `SequentialNonces` mode and adding the transaction would create a nonce gap. + /// + /// `current_account_nonce` should be the account's nonce in the latest chain state. /// - /// Note: this function only operates on the front of the queue. If the target nonce - /// is not at the front, nothing will be returned. - fn pop_front_multiple(&mut self, mut target_nonce: u32) -> Vec> { - assert!(!self.strict, "shouldn't be popping from strict queue"); - let mut popped_txs = Vec::>::new(); + /// Note: if the account `current_account_nonce` ever decreases, this is a logic error + /// and could mess up the validity of `SequentialNonces` containers. + fn add( + &mut self, + ttx: TimemarkedTransaction, + current_account_nonce: u32, + ) -> Result<(), InsertionError> { + if self.is_at_tx_limit() { + return Err(InsertionError::AccountSizeLimit); + } - while let Some((nonce, _)) = self.txs.first_key_value() { - if *nonce == target_nonce { - let (_, tx) = self - .txs - .pop_first() - .expect("popped value should exist in pop_front"); - popped_txs.push(tx); - target_nonce = target_nonce - .checked_add(1) - .expect("failed while incrementing nonce"); + if ttx.nonce() < current_account_nonce { + return Err(InsertionError::NonceTooLow); + } + + if let Some(existing_ttx) = self.txs().get(&ttx.signed_tx.nonce()) { + return Err(if existing_ttx.tx_hash == ttx.tx_hash { + InsertionError::AlreadyPresent } else { - // not target nonce - break; - } + InsertionError::NonceTaken + }); } - popped_txs - } - /// Pops the lowest transaction. - fn pop_front_single(&mut self) -> Option> { - if let Some((_, tx)) = self.txs.pop_first() { - Some(tx) - } else { - None + if !self.is_sequential_nonce_precondition_met(&ttx) { + return Err(InsertionError::NonceGap); } + + self.txs_mut().insert(ttx.signed_tx.nonce(), ttx); + + Ok(()) } - /// Returns the highest nonce value. - fn peek_end(&self) -> Option { - if let Some((nonce, _)) = self.txs.last_key_value() { - Some(*nonce) - } else { - None + /// Removes transactions with the given nonce and higher. + /// + /// Note: the given nonce is expected to be present. If it's absent, an error is logged and no + /// transactions are removed. + /// + /// Returns the hashes of the removed transactions. + fn remove(&mut self, nonce: u32) -> Vec<[u8; 32]> { + if !self.txs().contains_key(&nonce) { + error!(nonce, "transaction with given nonce not found"); + return Vec::new(); } + + self.txs_mut() + .split_off(&nonce) + .values() + .map(|ttx| ttx.tx_hash) + .collect() } - /// Returns a copy of the lowest transaction. - fn peek_front(&self) -> Option> { - if let Some((_, tx)) = self.txs.first_key_value() { - Some(tx.clone()) - } else { - None - } + /// Returns the transaction with the lowest nonce. + fn front(&self) -> Option<&TimemarkedTransaction> { + self.txs().first_key_value().map(|(_, ttx)| ttx) } - /// Remove transactions below the current account nonce. Used for - /// clearing out stale nonces. Returns the transactions that were removed. - fn fast_forward(&mut self, current_account_nonce: u32) -> Vec> { - let mut removed_txs = Vec::>::new(); - while let Some((nonce, _)) = self.txs.first_key_value() { - if *nonce < current_account_nonce { - let (_, tx) = self - .txs - .pop_first() - .expect("popped value should exist in fast_forward"); - removed_txs.push(tx); - } else { - // cleared out stale nonces - break; - } - } - removed_txs + /// Removes transactions below the given nonce. Returns the hashes of the removed transactions. + fn register_latest_account_nonce( + &mut self, + current_account_nonce: u32, + ) -> impl Iterator { + let mut split_off = self.txs_mut().split_off(¤t_account_nonce); + mem::swap(&mut split_off, self.txs_mut()); + split_off.into_values().map(|ttx| ttx.tx_hash) } - /// Returns the number of transactions in container. - fn size(&self) -> usize { - self.txs.len() + #[cfg(test)] + fn contains_tx(&self, tx_hash: &[u8; 32]) -> bool { + self.txs().values().any(|ttx| ttx.tx_hash == *tx_hash) } } -/// [`AccountTransactionContainer`] is a container used for mananging transactions for -/// multiple accounts. -/// -/// The `strict` mode defines if transaction nonces are allowed to be gapped or not for the managed -/// accounts. The `size_limit` is only enfored when the `strict` mode is false. +/// `TransactionsContainer` is a container used for managing transactions for multiple accounts. #[derive(Clone, Debug)] -pub(crate) struct TransactionContainer { - accounts: HashMap<[u8; 20], AccountTransactionContainer>, - all: HashSet<[u8; 32]>, - strict: bool, - size_limit: usize, +pub(super) struct TransactionsContainer { + /// A map of collections of transactions, indexed by the account address. + txs: HashMap<[u8; 20], T>, tx_ttl: Duration, } -impl TransactionContainer { - pub(crate) fn new(strict: bool, size_limit: usize, tx_ttl: Duration) -> Self { - assert!( - (!strict && size_limit != 0) || (strict && size_limit == 0), - "pending shouldn't have size restrictions and parked should" - ); - TransactionContainer { - accounts: HashMap::<[u8; 20], AccountTransactionContainer>::new(), - all: HashSet::<[u8; 32]>::new(), - strict, - size_limit, +impl TransactionsContainer { + pub(super) fn new(tx_ttl: Duration) -> Self { + TransactionsContainer:: { + txs: HashMap::new(), tx_ttl, } } - #[cfg(test)] - /// Returns the number of transactions in the container. - pub(crate) fn size(&self) -> usize { - self.all.len() - } - - /// Returns the highest nonce for an account. - pub(crate) fn pending_nonce(&self, address: [u8; 20]) -> Option { - if let Some(account) = self.accounts.get(&address) { - account.peek_end() - } else { - None - } - } - - /// Adds the transaction to the container. If failed, - /// returns the reason why. + /// Adds the transaction to the container. /// - /// `current_account_nonce` should be the current nonce - /// of the account associated with the transaction. If this - /// ever decreases, the strict containers could become messed up. - pub(crate) fn add( + /// `current_account_nonce` should be the current nonce of the account associated with the + /// transaction. If this ever decreases, the `TransactionsContainer` containers could become + /// invalid. + pub(super) fn add( &mut self, - ttx: Arc, + ttx: TimemarkedTransaction, current_account_nonce: u32, - ) -> anyhow::Result<(), InsertionError> { - // already tracked - if self.all.contains(&ttx.tx_hash()) { - return Err(InsertionError::AlreadyPresent); - } - - // create account map if necessary - let mut account_created = false; - if !self.accounts.contains_key(ttx.address()) { - account_created = true; - self.accounts.insert( - *ttx.address(), - AccountTransactionContainer::new(self.strict, self.size_limit), - ); - } - - // try to add transaction - let address_cache = *ttx.address(); - let hash_cache = ttx.tx_hash(); - let success = self - .accounts - .get_mut(ttx.address()) - .expect("AccountTransactionsContainer for account should exist") - .add(ttx, current_account_nonce); - - if success.is_ok() { - // add to all tracked if successfully added to account - self.all.insert(hash_cache); - } else if account_created { - // remove freshly created account if insertion failed - self.accounts.remove(&address_cache); + ) -> Result<(), InsertionError> { + match self.txs.entry(*ttx.address()) { + hash_map::Entry::Occupied(entry) => { + entry.into_mut().add(ttx, current_account_nonce)?; + } + hash_map::Entry::Vacant(entry) => { + let mut txs = T::new(); + txs.add(ttx, current_account_nonce)?; + entry.insert(txs); + } } - - success + Ok(()) } - /// Removes the target transaction and any transactions with higher - /// nonces if `remove_higher` is set to true. Returns all removed transactions. + /// Removes the given transaction and any transactions with higher nonces for the relevant + /// account. /// - /// Note: operates on the account<>nonce pair of the target transaction instead of the - /// transaction's hash. - pub(crate) fn remove( + /// If `signed_tx` existed, returns `Ok` with the hashes of the removed transactions. If + /// `signed_tx` was not in the collection, it is returned via `Err`. + pub(super) fn remove( &mut self, - ttx: &Arc, - remove_higher: bool, - ) -> Vec> { - let address = ttx.signed_tx.verification_key().address_bytes(); - - // return if no tracked account - if !self.accounts.contains_key(&address) { - return Vec::>::new(); - } + signed_tx: Arc, + ) -> Result, Arc> { + let address = signed_tx.verification_key().address_bytes(); + + // Take the collection for this account out of `self` temporarily. + let Some(mut account_txs) = self.txs.remove(&address) else { + return Err(signed_tx); + }; - // remove transactions - let removed_txs = self - .accounts - .get_mut(&address) - .expect("AccountTransactionsContainer for account should exist") - .remove(ttx.signed_tx.nonce(), remove_higher); + let removed = account_txs.remove(signed_tx.nonce()); - // remove transactions from all tracked - for tx in &removed_txs { - self.all.remove(&tx.tx_hash); + // Re-add the collection to `self` if it's not empty. + if !account_txs.txs().is_empty() { + let _ = self.txs.insert(address, account_txs); } - // remove account if out of transactions - if self - .accounts - .get(&address) - .expect("AccountTransactionsContainer for account should still exist") - .size() - == 0 - { - self.accounts.remove(&address); + if removed.is_empty() { + return Err(signed_tx); } - removed_txs + Ok(removed) } - /// Removes all of the transactions from an account and deletes the account entry. - /// Returns the removed transactions. - pub(crate) fn clear_account(&mut self, address: &[u8; 20]) -> Vec> { - let mut removed_txs = Vec::>::new(); - if let Some(account) = self.accounts.get_mut(address) { - removed_txs.append(&mut account.pop_all_transactions()); - - // remove from all - for tx in &removed_txs { - self.all.remove(&tx.tx_hash()); - } - // remove account - self.accounts.remove(address); - } - removed_txs + /// Removes all of the transactions for the given account and returns the hashes of the removed + /// transactions. + pub(super) fn clear_account(&mut self, address: &[u8; 20]) -> Vec<[u8; 32]> { + self.txs + .remove(address) + .map(|account_txs| account_txs.txs().values().map(|ttx| ttx.tx_hash).collect()) + .unwrap_or_default() } - /// Cleans all of the accounts in the container. Will remove any transactions with stale nonces - /// and will evict all transactions from accounts whose lowest transaction has expired. + /// Cleans all of the accounts in the container. Removes any transactions with stale nonces and + /// evicts all transactions from accounts whose lowest transaction has expired. /// /// Returns all transactions that have been removed with the reason why they have been removed. - pub(crate) async fn clean_accounts( + pub(super) async fn clean_accounts( &mut self, current_account_nonce_getter: &F, - ) -> anyhow::Result, RemovalReason)>> + ) -> Vec<([u8; 32], RemovalReason)> where F: Fn([u8; 20]) -> O, O: Future>, { // currently just removes stale nonces and will clear accounts if the // transactions are older than the TTL - let mut accounts_to_remove = Vec::<[u8; 20]>::new(); - let mut removed_txs = Vec::<(Arc, RemovalReason)>::new(); - for (address, account_txs) in &mut self.accounts { + let mut accounts_to_remove = Vec::new(); + let mut removed_txs = Vec::new(); + let now = Instant::now(); + for (address, account_txs) in &mut self.txs { // check if first tx is older than the TTL, if so, remove all transactions - if let Some(first_tx) = account_txs.peek_front() { - if first_tx.time_first_seen().elapsed() > self.tx_ttl { + if let Some(first_tx) = account_txs.front() { + if first_tx.is_expired(now, self.tx_ttl) { // first is stale, rest popped for invalidation - removed_txs.push(( + removed_txs.push((first_tx.tx_hash, RemovalReason::Expired)); + removed_txs.extend( account_txs - .pop_front_single() - .expect("first tx should exist"), - RemovalReason::Expired, - )); - for tx in account_txs.pop_all_transactions() { - println!("other nonce: {}", tx.signed_tx().nonce()); - removed_txs.push((tx, RemovalReason::LowerNonceInvalidated)); - } + .txs() + .values() + .skip(1) + .map(|ttx| (ttx.tx_hash, RemovalReason::LowerNonceInvalidated)), + ); + account_txs.txs_mut().clear(); } else { // clean to newest nonce - let current_account_nonce = current_account_nonce_getter(*address) - .await - .context("failed to fetch account nonce for fast forward")?; - for tx in account_txs.fast_forward(current_account_nonce) { - removed_txs.push((tx, RemovalReason::NonceStale)); - } + let current_account_nonce = match current_account_nonce_getter(*address).await { + Ok(nonce) => nonce, + Err(error) => { + error!( + address = %telemetry::display::base64(address), + "failed to fetch nonce from state when cleaning accounts: {error:#}" + ); + continue; + } + }; + removed_txs.extend( + account_txs + .register_latest_account_nonce(current_account_nonce) + .map(|tx_hash| (tx_hash, RemovalReason::NonceStale)), + ); } } - if account_txs.size() == 0 { + if account_txs.txs().is_empty() { accounts_to_remove.push(*address); } } // remove empty accounts for account in accounts_to_remove { - // remove empty accounts - self.accounts.remove(&account); + self.txs.remove(&account); } - // untrack transactions - for (tx, _) in &removed_txs { - self.all.remove(&tx.tx_hash()); - } + removed_txs + } - Ok(removed_txs) + /// Returns the number of transactions in the container. + pub(super) fn len(&self) -> usize { + self.txs + .values() + .map(|account_txs| account_txs.txs().len()) + .sum() } - /// Removes and returns transactions that are lower than or equal to the current nonce of - /// accounts. This is helpful when needing to promote transactions from parked to pending - /// during mempool maintenance. - pub(crate) async fn find_promotables( - &mut self, - current_account_nonce_getter: &F, - ) -> anyhow::Result>> + #[cfg(test)] + fn contains_tx(&self, tx_hash: &[u8; 32]) -> bool { + self.txs + .values() + .any(|account_txs| account_txs.contains_tx(tx_hash)) + } +} + +impl TransactionsContainer { + /// Returns the highest nonce for an account. + pub(super) fn pending_nonce(&self, address: [u8; 20]) -> Option { + self.txs + .get(&address) + .and_then(PendingTransactionsForAccount::highest_nonce) + } + + /// Returns a copy of transactions and their hashes sorted by nonce difference and then time + /// first seen. + pub(super) async fn builder_queue( + &self, + current_account_nonce_getter: F, + ) -> anyhow::Result)>> where F: Fn([u8; 20]) -> O, O: Future>, { - assert!(!self.strict, "should not be promoting from pending"); - let mut accounts_to_remove = Vec::<[u8; 20]>::new(); - let mut promoted_txs = Vec::>::new(); + // Used to hold the values in Vec for sorting. + struct QueueEntry { + tx: Arc, + tx_hash: [u8; 32], + priority: TransactionPriority, + } - for (address, account_txs) in &mut self.accounts { + let mut queue = Vec::with_capacity(self.len()); + // Add all transactions to the queue. + for (address, account_txs) in &self.txs { let current_account_nonce = current_account_nonce_getter(*address) .await - .context("failed to fetch account nonce for pop front")?; - - // find transactions that can be promoted - // note: can use current account nonce as target because this logic - // is only handling the case where transactions we didn't have in our - // local mempool were ran that would enable the parked transactions to - // be valid - promoted_txs.append(&mut account_txs.pop_front_multiple(current_account_nonce)); - - if account_txs.size() == 0 { - accounts_to_remove.push(*address); + .context("failed to fetch account nonce for builder queue")?; + for ttx in account_txs.txs.values() { + let priority = match ttx.priority(current_account_nonce) { + Ok(priority) => priority, + Err(error) => { + // mempool could be off due to node connectivity issues + error!( + tx_hash = %telemetry::display::base64(&ttx.tx_hash), + "failed to add pending tx to builder queue: {error:#}" + ); + continue; + } + }; + queue.push(QueueEntry { + tx: ttx.signed_tx.clone(), + tx_hash: ttx.tx_hash, + priority, + }); } } - // remove empty accounts - for account in accounts_to_remove { - // remove empty accounts - self.accounts.remove(&account); - } - - // untrack transactions - for tx in &promoted_txs { - self.all.remove(&tx.tx_hash()); - } - - Ok(promoted_txs) + // Sort the queue and return the relevant data. Note that the sorted queue will be ordered + // from lowest to highest priority, so we need to reverse the order before returning. + queue.sort_unstable_by_key(|entry| entry.priority); + Ok(queue + .into_iter() + .rev() + .map(|entry| (entry.tx_hash, entry.tx)) + .collect()) } +} +impl TransactionsContainer> { /// Removes and returns the transactions from the front of an account, similar to - /// `find_promotables()`. Useful for when needing to promote transactions from a specific + /// `find_promotables`. Useful for when needing to promote transactions from a specific /// account instead of all accounts. - pub(crate) fn pop_front_account( + pub(super) fn pop_front_account( &mut self, account: &[u8; 20], target_nonce: u32, - ) -> Vec> { - if let Some(account_txs) = self.accounts.get_mut(account) { - let removed_txs = account_txs.pop_front_multiple(target_nonce); - for tx in &removed_txs { - // remove popped transactions from all - self.all.remove(&tx.tx_hash()); - } + ) -> Vec { + // Take the collection for this account out of `self` temporarily. + let Some(mut account_txs) = self.txs.remove(account) else { + return Vec::new(); + }; - if account_txs.size() == 0 { - // remove empty account - self.accounts.remove(account); - } + let removed = account_txs.pop_front_contiguous(target_nonce); - removed_txs - } else { - Vec::>::new() + // Re-add the collection to `self` if it's not empty. + if !account_txs.txs().is_empty() { + let _ = self.txs.insert(*account, account_txs); } + removed.collect() } - /// Returns a copy of transactions sorted by nonce difference and then time first seen. - pub(crate) async fn builder_queue( - &self, - current_account_nonce_getter: F, - ) -> anyhow::Result + /// Removes and returns transactions along with their account's current nonce that are lower + /// than or equal to that nonce. This is helpful when needing to promote transactions from + /// parked to pending during mempool maintenance. + pub(super) async fn find_promotables( + &mut self, + current_account_nonce_getter: &F, + ) -> Vec<(TimemarkedTransaction, u32)> where F: Fn([u8; 20]) -> O, O: Future>, { - assert!( - self.strict, - "shouldn't be calling build on gapped container" - ); + let mut accounts_to_remove = Vec::new(); + let mut promoted_txs = Vec::new(); + + for (address, account_txs) in &mut self.txs { + let current_account_nonce = match current_account_nonce_getter(*address).await { + Ok(nonce) => nonce, + Err(error) => { + error!( + address = %telemetry::display::base64(address), + "failed to fetch nonce from state when finding promotables: {error:#}" + ); + continue; + } + }; - let mut builder_queue = BuilderQueue::new(); - for (address, account_txs) in &self.accounts { - let current_account_nonce = current_account_nonce_getter(*address) - .await - .context("failed to fetch account nonce for builder queue")?; + // find transactions that can be promoted + // note: can use current account nonce as target because this logic + // is only handling the case where transactions we didn't have in our + // local mempool were ran that would enable the parked transactions to + // be valid + promoted_txs.extend( + account_txs + .pop_front_contiguous(current_account_nonce) + .map(|ttx| (ttx, current_account_nonce)), + ); - let txs = account_txs.all_transactions_copy(); - for tx in txs { - match tx.priority(current_account_nonce) { - Ok(tx_priority) => { - builder_queue.push(tx.clone(), tx_priority); - } - Err(_) => continue, // mempool could be off due to node connectivity issues - } + if account_txs.txs.is_empty() { + accounts_to_remove.push(*address); } } - Ok(builder_queue) + + // remove empty accounts + for account in accounts_to_remove { + self.txs.remove(&account); + } + + promoted_txs } } #[cfg(test)] mod test { - use std::{ - hash::{ - Hash, - Hasher, - }, - time::Duration, - }; - use astria_core::crypto::SigningKey; use super::*; - use crate::app::test_utils::get_mock_tx_parameterized; + use crate::app::test_utils::mock_tx; - const STRICT_SIZE_LIMIT: usize = 0; - const UNSTRICT_SIZE_LIMIT: usize = 15; + const MAX_PARKED_TXS_PER_ACCOUNT: usize = 15; + const TX_TTL: Duration = Duration::from_secs(2); + + fn mock_ttx(nonce: u32, signer: &SigningKey) -> TimemarkedTransaction { + TimemarkedTransaction::new(mock_tx(nonce, signer, "test")) + } #[test] fn transaction_priority_should_error_if_invalid() { - let ttx = - TimemarkedTransaction::new(get_mock_tx_parameterized(0, &[1; 32].into(), [1; 32])); + let ttx = TimemarkedTransaction::new(mock_tx(0, &[1; 32].into(), "test")); let priority = ttx.priority(1); assert!( @@ -792,902 +768,532 @@ mod test { } #[test] - // From https://doc.rust-lang.org/std/hash/trait.Hash.html#hash-and-eq - fn timemarked_tx_hash_and_eq_should_be_consistent() { - // Check timemarked txs compare equal if and only if their tx hashes are equal. - let signed_tx_0_a = get_mock_tx_parameterized(0, &[1; 32].into(), [1; 32]); - let tx0 = TimemarkedTransaction { - tx_hash: [0; 32], - signed_tx: signed_tx_0_a.clone(), - address: signed_tx_0_a.verification_key().address_bytes(), - - time_first_seen: Instant::now(), - }; - let signed_tx_0_b = get_mock_tx_parameterized(1, &[1; 32].into(), [1; 32]); - let other_tx0 = TimemarkedTransaction { - tx_hash: [0; 32], - signed_tx: signed_tx_0_b.clone(), - address: signed_tx_0_b.verification_key().address_bytes(), - - time_first_seen: Instant::now(), - }; - let signed_tx_1 = get_mock_tx_parameterized(1, &[1; 32].into(), [1; 32]); - let tx1 = TimemarkedTransaction { - tx_hash: [1; 32], - signed_tx: signed_tx_1.clone(), - address: signed_tx_1.verification_key().address_bytes(), - time_first_seen: Instant::now(), - }; - assert!(tx0 == other_tx0); - assert!(tx0 != tx1); - - // Check timemarked txs' std hashes compare equal if and only if their tx hashes are equal. - let std_hash = |ttx: &TimemarkedTransaction| -> u64 { - let mut hasher = std::hash::DefaultHasher::new(); - ttx.hash(&mut hasher); - hasher.finish() - }; - assert!(std_hash(&tx0) == std_hash(&other_tx0)); - assert!(std_hash(&tx0) != std_hash(&tx1)); - } - - #[test] - fn account_trasaction_container_non_strict_add() { - let mut account_container = AccountTransactionContainer::new(false, UNSTRICT_SIZE_LIMIT); + fn parked_transactions_for_account_add() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); // transactions to add - let ttx_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &[1; 32].into(), - [1; 32], - ))); - let ttx_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &[1; 32].into(), - [1; 32], - ))); - let ttx_5 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 5, - &[1; 32].into(), - [1; 32], - ))); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_5 = mock_ttx(5, &[1; 32].into()); let current_account_nonce = 2; - assert!(matches!( - account_container.add(ttx_3.clone(), current_account_nonce), - Ok(()) - )); - assert!(matches!( - account_container.add(ttx_3, current_account_nonce), - Err(InsertionError::NonceTaken) - )); + parked_txs + .add(ttx_3.clone(), current_account_nonce) + .unwrap(); + assert!(parked_txs.contains_tx(&ttx_3.tx_hash)); + assert_eq!( + parked_txs.add(ttx_3, current_account_nonce).unwrap_err(), + InsertionError::AlreadyPresent + ); // add gapped transaction - assert!(matches!( - account_container.add(ttx_5, current_account_nonce), - Ok(()) - )); + parked_txs.add(ttx_5, current_account_nonce).unwrap(); // fail adding too low nonce - assert!(matches!( - account_container.add(ttx_1, current_account_nonce), - Err(InsertionError::NonceTooLow) - )); + assert_eq!( + parked_txs.add(ttx_1, current_account_nonce).unwrap_err(), + InsertionError::NonceTooLow + ); } #[test] - fn account_trasaction_container_non_strict_size_limit() { - let mut account_container = AccountTransactionContainer::new(false, 2); + fn parked_transactions_for_account_size_limit() { + let mut parked_txs = ParkedTransactionsForAccount::<2>::new(); // transactions to add - let ttx_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &[1; 32].into(), - [1; 32], - ))); - let ttx_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &[1; 32].into(), - [1; 32], - ))); - let ttx_5 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 5, - &[1; 32].into(), - [1; 32], - ))); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_5 = mock_ttx(5, &[1; 32].into()); let current_account_nonce = 0; - assert!(matches!( - account_container.add(ttx_3.clone(), current_account_nonce), - Ok(()) - )); - assert!(matches!( - account_container.add(ttx_5, current_account_nonce), - Ok(()) - )); + parked_txs + .add(ttx_3.clone(), current_account_nonce) + .unwrap(); + parked_txs.add(ttx_5, current_account_nonce).unwrap(); // fail with size limit hit - assert!(matches!( - account_container.add(ttx_1, current_account_nonce), - Err(InsertionError::AccountSizeLimit) - )); + assert_eq!( + parked_txs.add(ttx_1, current_account_nonce).unwrap_err(), + InsertionError::AccountSizeLimit + ); } #[test] - fn account_trasaction_container_strict_add() { - let mut account_container = AccountTransactionContainer::new(true, STRICT_SIZE_LIMIT); + fn pending_transactions_for_account_add() { + let mut pending_txs = PendingTransactionsForAccount::new(); // transactions to add - let ttx_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &[1; 32].into(), - [1; 32], - ))); - let ttx_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &[1; 32].into(), - [1; 32], - ))); - let ttx_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &[1; 32].into(), - [1; 32], - ))); - let ttx_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &[1; 32].into(), - [1; 32], - ))); + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); let current_account_nonce = 1; // too low nonces not added - assert!(matches!( - account_container.add(ttx_0, current_account_nonce), - Err(InsertionError::NonceTooLow) - )); + assert_eq!( + pending_txs.add(ttx_0, current_account_nonce).unwrap_err(), + InsertionError::NonceTooLow + ); + assert!(pending_txs.txs().is_empty()); // add ok - assert!(matches!( - account_container.add(ttx_1.clone(), current_account_nonce), - Ok(()) - )); - assert!(matches!( - account_container.add(ttx_1, current_account_nonce), - Err(InsertionError::NonceTaken) - )); + pending_txs + .add(ttx_1.clone(), current_account_nonce) + .unwrap(); + assert_eq!( + pending_txs.add(ttx_1, current_account_nonce).unwrap_err(), + InsertionError::AlreadyPresent + ); // gapped transaction not allowed - assert!(matches!( - account_container.add(ttx_3, current_account_nonce), - Err(InsertionError::NonceGap) - )); + assert_eq!( + pending_txs.add(ttx_3, current_account_nonce).unwrap_err(), + InsertionError::NonceGap + ); // can add consecutive - assert!(matches!( - account_container.add(ttx_2, current_account_nonce), - Ok(()) - )); + pending_txs.add(ttx_2, current_account_nonce).unwrap(); } #[test] - fn account_trasaction_container_remove() { - let mut account_container = AccountTransactionContainer::new(true, STRICT_SIZE_LIMIT); + fn transactions_for_account_remove() { + let mut account_txs = PendingTransactionsForAccount::new(); // transactions to add - let ttx_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &[1; 32].into(), - [1; 32], - ))); - let ttx_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &[1; 32].into(), - [1; 32], - ))); - let ttx_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &[1; 32].into(), - [1; 32], - ))); - let ttx_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &[1; 32].into(), - [1; 32], - ))); + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); - account_container.add(ttx_0, 0).unwrap(); - account_container.add(ttx_1, 0).unwrap(); - account_container.add(ttx_2, 0).unwrap(); - account_container.add(ttx_3, 0).unwrap(); + account_txs.add(ttx_0.clone(), 0).unwrap(); + account_txs.add(ttx_1.clone(), 0).unwrap(); + account_txs.add(ttx_2.clone(), 0).unwrap(); + account_txs.add(ttx_3.clone(), 0).unwrap(); - // non-remove-higher remove ok + // remove from end will only remove end assert_eq!( - account_container.remove(1, false).len(), - 1, - "tranasction should've been removed" + account_txs.remove(3), + vec![ttx_3.tx_hash], + "only one transaction should've been removed" ); - assert_eq!(account_container.size(), 3); + assert_eq!(account_txs.txs().len(), 3); // remove same again return nothing assert_eq!( - account_container.remove(1, true).len(), + account_txs.remove(3).len(), 0, "no transaction should be removed" ); + assert_eq!(account_txs.txs().len(), 3); - // remove from end will only remove end + // remove from start will remove all assert_eq!( - account_container.remove(3, true).len(), - 1, - "only one transaction should've been removed" + account_txs.remove(0), + vec![ttx_0.tx_hash, ttx_1.tx_hash, ttx_2.tx_hash,], + "three transactions should've been removed" ); - assert_eq!(account_container.size(), 2); - - // remove higher from start will remove all - assert_eq!( - account_container.remove(0, true).len(), - 2, - "two transactions should've been removed" - ); - assert_eq!(account_container.size(), 0); + assert!(account_txs.txs().is_empty()); } #[test] - fn account_trasaction_pop_front_multiple() { - let mut account_container = AccountTransactionContainer::new(false, UNSTRICT_SIZE_LIMIT); + fn parked_transactions_for_account_pop_front_contiguous() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); // transactions to add - let ttx_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &[1; 32].into(), - [1; 32], - ))); - let ttx_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &[1; 32].into(), - [1; 32], - ))); - let ttx_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &[1; 32].into(), - [1; 32], - ))); - let ttx_4 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 4, - &[1; 32].into(), - [1; 32], - ))); + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_4 = mock_ttx(4, &[1; 32].into()); - account_container.add(ttx_0, 0).unwrap(); - account_container.add(ttx_2, 0).unwrap(); - account_container.add(ttx_3, 0).unwrap(); - account_container.add(ttx_4, 0).unwrap(); + parked_txs.add(ttx_0.clone(), 0).unwrap(); + parked_txs.add(ttx_2.clone(), 0).unwrap(); + parked_txs.add(ttx_3.clone(), 0).unwrap(); + parked_txs.add(ttx_4.clone(), 0).unwrap(); // lowest nonce not target nonce is noop assert_eq!( - account_container.pop_front_multiple(2).len(), + parked_txs.pop_front_contiguous(2).count(), 0, "no transaction should've been removed" ); - assert_eq!(account_container.size(), 4); + assert_eq!(parked_txs.txs().len(), 4); // will remove single value assert_eq!( - account_container.pop_front_multiple(0).len(), - 1, + parked_txs + .pop_front_contiguous(0) + .map(|ttx| ttx.tx_hash) + .collect::>(), + vec![ttx_0.tx_hash], "single transaction should've been returned" ); - assert_eq!(account_container.size(), 3); + assert_eq!(parked_txs.txs().len(), 3); // will remove multiple values assert_eq!( - account_container.pop_front_multiple(2).len(), - 3, + parked_txs + .pop_front_contiguous(2) + .map(|ttx| ttx.tx_hash) + .collect::>(), + vec![ttx_2.tx_hash, ttx_3.tx_hash, ttx_4.tx_hash], "multiple transaction should've been returned" ); - assert_eq!(account_container.size(), 0); + assert!(parked_txs.txs().is_empty()); } #[test] - fn account_trasaction_peek_end() { - let mut account_container = AccountTransactionContainer::new(false, UNSTRICT_SIZE_LIMIT); + fn pending_transactions_for_account_highest_nonce() { + let mut pending_txs = PendingTransactionsForAccount::new(); // no transactions ok - assert_eq!( - account_container.peek_end(), - None, + assert!( + pending_txs.highest_nonce().is_none(), "no transactions will return None" ); // transactions to add - let ttx_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &[1; 32].into(), - [1; 32], - ))); - let ttx_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &[1; 32].into(), - [1; 32], - ))); + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); - account_container.add(ttx_0, 0).unwrap(); - account_container.add(ttx_2, 0).unwrap(); + pending_txs.add(ttx_0, 0).unwrap(); + pending_txs.add(ttx_1, 0).unwrap(); + pending_txs.add(ttx_2, 0).unwrap(); // will return last transaction assert_eq!( - account_container.peek_end(), + pending_txs.highest_nonce(), Some(2), "highest nonce should be returned" ); } #[test] - fn account_trasaction_peek_front() { - let mut account_container = AccountTransactionContainer::new(false, UNSTRICT_SIZE_LIMIT); + fn transactions_for_account_front() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); // no transactions ok - assert_eq!( - account_container.peek_front(), - None, + assert!( + parked_txs.front().is_none(), "no transactions will return None" ); // transactions to add - let ttx_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &[1; 32].into(), - [1; 32], - ))); - let ttx_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &[1; 32].into(), - [1; 32], - ))); + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); - account_container.add(ttx_0.clone(), 0).unwrap(); - account_container.add(ttx_2, 0).unwrap(); + parked_txs.add(ttx_0.clone(), 0).unwrap(); + parked_txs.add(ttx_2, 0).unwrap(); - // will return last transaction + // will return first transaction assert_eq!( - account_container.peek_front(), - Some(ttx_0), + parked_txs.front().unwrap().tx_hash, + ttx_0.tx_hash, "lowest transaction should be returned" ); } #[test] - fn account_trasaction_fast_forward() { - let mut account_container = AccountTransactionContainer::new(false, UNSTRICT_SIZE_LIMIT); + fn transactions_for_account_register_latest_account_nonce() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); // transactions to add - let ttx_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &[1; 32].into(), - [1; 32], - ))); - let ttx_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &[1; 32].into(), - [1; 32], - ))); - let ttx_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &[1; 32].into(), - [1; 32], - ))); - let ttx_4 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 4, - &[1; 32].into(), - [1; 32], - ))); + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_4 = mock_ttx(4, &[1; 32].into()); - account_container.add(ttx_0, 0).unwrap(); - account_container.add(ttx_2, 0).unwrap(); - account_container.add(ttx_3, 0).unwrap(); - account_container.add(ttx_4, 0).unwrap(); + parked_txs.add(ttx_0.clone(), 0).unwrap(); + parked_txs.add(ttx_2.clone(), 0).unwrap(); + parked_txs.add(ttx_3.clone(), 0).unwrap(); + parked_txs.add(ttx_4.clone(), 0).unwrap(); // matching nonce will not be removed assert_eq!( - account_container.fast_forward(0).len(), + parked_txs.register_latest_account_nonce(0).count(), 0, "no transaction should've been removed" ); - assert_eq!(account_container.size(), 4); + assert_eq!(parked_txs.txs().len(), 4); // fast forwarding to non existing middle nonce ok assert_eq!( - account_container.fast_forward(1).len(), - 1, - "one transaction should've been removed" + parked_txs + .register_latest_account_nonce(1) + .collect::>(), + vec![ttx_0.tx_hash], + "ttx_0 should've been removed" ); - assert_eq!(account_container.size(), 3); + assert_eq!(parked_txs.txs().len(), 3); // fast forwarding to existing nonce ok assert_eq!( - account_container.fast_forward(3).len(), - 1, + parked_txs + .register_latest_account_nonce(3) + .collect::>(), + vec![ttx_2.tx_hash], "one transaction should've been removed" ); - assert_eq!(account_container.size(), 2); + assert_eq!(parked_txs.txs().len(), 2); // fast forwarding to much higher nonce ok assert_eq!( - account_container.fast_forward(10).len(), - 2, - "two transaction should've been removed" + parked_txs + .register_latest_account_nonce(10) + .collect::>(), + vec![ttx_3.tx_hash, ttx_4.tx_hash], + "two transactions should've been removed" ); - assert_eq!(account_container.size(), 0); + assert!(parked_txs.txs().is_empty()); } #[test] - fn transaction_container_pending_nonces() { - let mut transaction_container = - TransactionContainer::new(false, UNSTRICT_SIZE_LIMIT, Duration::from_secs(2)); - let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); - - let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); - - // transactions to add for account 0 - let ttx_s0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [1; 32], - ))); - let ttx_s0_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_0, - [1; 32], - ))); - - transaction_container.add(ttx_s0_0, 0).unwrap(); - transaction_container.add(ttx_s0_2, 0).unwrap(); + fn transactions_container_add() { + let mut pending_txs = PendingTransactions::new(TX_TTL); - // empty account returns zero - assert_eq!( - transaction_container.pending_nonce(signing_address_1), - None, - "empty account should return None" - ); - - // non empty account returns highest nonce - assert_eq!( - transaction_container.pending_nonce(signing_address_0), - Some(2), - "should return highest nonce" - ); - } - - #[test] - #[allow(clippy::too_many_lines)] - fn transaction_container_add() { - let mut transaction_container = - TransactionContainer::new(true, STRICT_SIZE_LIMIT, Duration::from_secs(2)); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); + let signing_address_1 = signing_key_1.address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [1; 32], - ))); - let ttx_s0_0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [2; 32], - ))); - let ttx_s0_2_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_0, - [2; 32], - ))); - let ttx_s1_0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_0_0 = mock_ttx(0, &signing_key_0); + // Same nonce and signer as `ttx_s0_0_0`, but different rollup name, hence different tx. + let ttx_s0_0_1 = TimemarkedTransaction::new(mock_tx(0, &signing_key_0, "other")); + let ttx_s0_2_0 = mock_ttx(2, &signing_key_0); + let ttx_s1_0_0 = mock_ttx(0, &signing_key_1); // transactions to add for account 1 // initially no accounts should exist - assert_eq!( - transaction_container.accounts.len(), - 0, + assert!( + pending_txs.txs.is_empty(), "no accounts should exist at first" ); // adding too low nonce shouldn't create account - assert!( - matches!( - transaction_container.add(ttx_s0_0_0.clone(), 1), - Err(InsertionError::NonceTooLow) - ), + assert_eq!( + pending_txs.add(ttx_s0_0_0.clone(), 1).unwrap_err(), + InsertionError::NonceTooLow, "shouldn't be able to add nonce too low transaction" ); - assert_eq!( - transaction_container.accounts.len(), - 0, + assert!( + pending_txs.txs.is_empty(), "failed adds to new accounts shouldn't create account" ); // add one transaction - assert!( - matches!(transaction_container.add(ttx_s0_0_0.clone(), 0), Ok(())), - "should be able to add transaction" - ); - assert_eq!( - transaction_container.accounts.len(), - 1, - "one account should exist" - ); + pending_txs.add(ttx_s0_0_0.clone(), 0).unwrap(); + assert_eq!(pending_txs.txs.len(), 1, "one account should exist"); - // readding transaction should fail - assert!( - matches!( - transaction_container.add(ttx_s0_0_0, 0), - Err(InsertionError::AlreadyPresent) - ), - "readding same transaction should fail" + // re-adding transaction should fail + assert_eq!( + pending_txs.add(ttx_s0_0_0, 0).unwrap_err(), + InsertionError::AlreadyPresent, + "re-adding same transaction should fail" ); // nonce replacement fails - assert!( - matches!( - transaction_container.add(ttx_s0_0_1, 0), - Err(InsertionError::NonceTaken) - ), + assert_eq!( + pending_txs.add(ttx_s0_0_1, 0).unwrap_err(), + InsertionError::NonceTaken, "nonce replacement not supported" ); // nonce gaps not supported - assert!( - matches!( - transaction_container.add(ttx_s0_2_0, 0), - Err(InsertionError::NonceGap) - ), - "gapped nonces in strict not allowed" + assert_eq!( + pending_txs.add(ttx_s0_2_0, 0).unwrap_err(), + InsertionError::NonceGap, + "gapped nonces in pending transactions not allowed" ); // add transactions for account 2 - assert!( - matches!(transaction_container.add(ttx_s1_0_0, 0), Ok(())), - "should be able to add transaction to second account" - ); + pending_txs.add(ttx_s1_0_0, 0).unwrap(); // check internal structures + assert_eq!(pending_txs.txs.len(), 2, "two accounts should exist"); assert_eq!( - transaction_container.accounts.len(), - 2, - "two accounts should exist" - ); - assert_eq!( - transaction_container - .accounts - .get(&signing_address_0) - .unwrap() - .size(), + pending_txs.txs.get(&signing_address_0).unwrap().txs().len(), 1, "one transaction should be in the original account" ); assert_eq!( - transaction_container - .accounts - .get(&signing_address_1) - .unwrap() - .size(), + pending_txs.txs.get(&signing_address_1).unwrap().txs().len(), 1, "one transaction should be in the second account" ); assert_eq!( - transaction_container.size(), + pending_txs.len(), 2, "should only have two transactions tracked" ); } #[test] - fn transaction_container_remove() { - let mut transaction_container = - TransactionContainer::new(true, STRICT_SIZE_LIMIT, Duration::from_secs(2)); + fn transactions_container_remove() { + let mut pending_txs = PendingTransactions::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); - let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [1; 32], - ))); - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [2; 32], - ))); - let ttx_s0_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_0, - [2; 32], - ))); - let ttx_s1_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); - // remove on empty returns zero - assert_eq!( - transaction_container.remove(&ttx_s0_0, true).len(), - 0, + // remove on empty returns the tx in Err variant. + assert!( + pending_txs.remove(ttx_s0_0.signed_tx.clone()).is_err(), "zero transactions should be removed from non existing accounts" ); // add transactions - transaction_container.add(ttx_s0_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_2.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_2.clone(), 0).unwrap(); - - // remove with remove_higher to false should only remove one - assert_eq!( - transaction_container.remove(&ttx_s0_0, false).len(), - 1, - "single transactions should be removed when remove_higher is false" - ); - assert_eq!( - transaction_container - .accounts - .get(&signing_address_0) - .unwrap() - .size(), - 2, - "two transactions should be in the original account" - ); + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_1.clone(), 0).unwrap(); - // remove with remove_higher set to true should remove rest + // remove should remove tx and higher assert_eq!( - transaction_container.remove(&ttx_s0_1, true).len(), - 2, - "rest of transactions should be removed when remove_higher is true and targeting \ - bottom nonce" + pending_txs.remove(ttx_s0_0.signed_tx.clone()).unwrap(), + vec![ttx_s0_0.tx_hash, ttx_s0_1.tx_hash], + "rest of transactions for account should be removed when targeting bottom nonce" ); + assert_eq!(pending_txs.txs.len(), 1, "empty account should be removed"); assert_eq!( - transaction_container.accounts.len(), - 1, - "empty account should be removed" + pending_txs.len(), + 2, + "should only have two transactions tracked" ); - assert_eq!( - transaction_container.size(), - 3, - "should only have three transactions tracked" + assert!( + pending_txs.contains_tx(&ttx_s1_0.tx_hash), + "other account should be untouched" ); - assert_eq!( - transaction_container - .accounts - .get(&signing_address_1) - .unwrap() - .size(), - 3, + assert!( + pending_txs.contains_tx(&ttx_s1_1.tx_hash), "other account should be untouched" ); } #[test] - fn transaction_container_clear_account() { - let mut transaction_container = - TransactionContainer::new(true, STRICT_SIZE_LIMIT, Duration::from_secs(2)); + fn transactions_container_clear_account() { + let mut pending_txs = PendingTransactions::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [1; 32], - ))); - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [2; 32], - ))); - let ttx_s1_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); // clear all on empty returns zero - assert_eq!( - transaction_container - .clear_account(&signing_address_0) - .len(), - 0, + assert!( + pending_txs.clear_account(&signing_address_0).is_empty(), "zero transactions should be removed from clearing non existing accounts" ); // add transactions - transaction_container.add(ttx_s0_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); // clear should return all transactions assert_eq!( - transaction_container - .clear_account(&signing_address_0) - .len(), - 2, + pending_txs.clear_account(&signing_address_0), + vec![ttx_s0_0.tx_hash, ttx_s0_1.tx_hash], "all transactions should be returned from clearing account" ); + assert_eq!(pending_txs.txs.len(), 1, "empty account should be removed"); assert_eq!( - transaction_container.accounts.len(), - 1, - "empty account should be removed" - ); - assert_eq!( - transaction_container.size(), + pending_txs.len(), 1, "should only have one transaction tracked" ); - assert_eq!( - transaction_container - .accounts - .get(&signing_address_1) - .unwrap() - .size(), - 1, + assert!( + pending_txs.contains_tx(&ttx_s1_0.tx_hash), "other account should be untouched" ); } #[tokio::test] - #[allow(clippy::too_many_lines)] - async fn transaction_container_clean_accounts_nonce() { - let mut transaction_container = - TransactionContainer::new(true, STRICT_SIZE_LIMIT, Duration::from_secs(2)); + async fn transactions_container_clean_accounts() { + let mut pending_txs = PendingTransactions::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); + let signing_address_1 = signing_key_1.address_bytes(); let signing_key_2 = SigningKey::from([3; 32]); - let signing_address_2 = signing_key_2.clone().verification_key().address_bytes(); + let signing_address_2 = signing_key_2.address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [1; 32], - ))); - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [2; 32], - ))); - let ttx_s0_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_0, - [2; 32], - ))); - let ttx_s1_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_1, - [1; 32], - ))); - let ttx_s2_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_2, - [1; 32], - ))); - let ttx_s2_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_2, - [1; 32], - ))); - let ttx_s2_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_2, - [1; 32], - ))); + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s0_2 = mock_ttx(2, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s2_0 = mock_ttx(0, &signing_key_2); + let ttx_s2_1 = mock_ttx(1, &signing_key_2); + let ttx_s2_2 = mock_ttx(2, &signing_key_2); // add transactions - transaction_container.add(ttx_s0_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_2.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_2.clone(), 0).unwrap(); - transaction_container.add(ttx_s2_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s2_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s2_2.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_2.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_2.clone(), 0).unwrap(); + pending_txs.add(ttx_s2_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s2_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s2_2.clone(), 0).unwrap(); // current nonce getter // should pop none from signing_address_0, one from signing_address_1, and all from // signing_address_2 let current_account_nonce_getter = |address: [u8; 20]| async move { if address == signing_address_0 { - return Ok(0); - } - if address == signing_address_1 { - return Ok(1); - } - if address == signing_address_2 { - return Ok(4); + Ok(0) + } else if address == signing_address_1 { + Ok(1) + } else if address == signing_address_2 { + Ok(4) + } else { + Err(anyhow::anyhow!("invalid address")) } - Err(anyhow::anyhow!("invalid address")) }; - let removed_txs = transaction_container + let removed_txs = pending_txs .clean_accounts(¤t_account_nonce_getter) - .await - .unwrap(); + .await; assert_eq!( removed_txs.len(), 4, "four transactions should've been popped" ); + assert_eq!(pending_txs.txs.len(), 2, "empty accounts should be removed"); assert_eq!( - transaction_container.accounts.len(), - 2, - "empty accounts should be removed" - ); - assert_eq!( - transaction_container.size(), + pending_txs.len(), 5, "5 transactions should be remaining from original 9" ); + assert!(pending_txs.contains_tx(&ttx_s0_0.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s0_1.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s0_2.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s1_1.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s1_2.tx_hash)); + assert_eq!( - transaction_container - .accounts - .get(&signing_address_0) - .unwrap() - .size(), + pending_txs.txs.get(&signing_address_0).unwrap().txs().len(), 3 ); assert_eq!( - transaction_container - .accounts - .get(&signing_address_1) - .unwrap() - .size(), + pending_txs.txs.get(&signing_address_1).unwrap().txs().len(), 2 ); for (_, reason) in removed_txs { @@ -1699,358 +1305,262 @@ mod test { } #[tokio::test(start_paused = true)] - async fn transaction_container_clean_accounts_expired_transactions() { - let mut transaction_container = - TransactionContainer::new(true, STRICT_SIZE_LIMIT, Duration::from_secs(2)); + async fn transactions_container_clean_accounts_expired_transactions() { + let mut pending_txs = PendingTransactions::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); + let signing_address_1 = signing_key_1.address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_0, - [1; 32], - ))); + let ttx_s0_0 = mock_ttx(0, &signing_key_0); // pass time to make first transaction stale - tokio::time::advance(Duration::from_secs(5)).await; + tokio::time::advance(TX_TTL.saturating_add(Duration::from_nanos(1))).await; - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [2; 32], - ))); - let ttx_s1_0 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 0, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); // add transactions - transaction_container.add(ttx_s0_0.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); // current nonce getter // all nonces should be valid let current_account_nonce_getter = |address: [u8; 20]| async move { - if address == signing_address_0 { - return Ok(0); - } - if address == signing_address_1 { + if address == signing_address_0 || address == signing_address_1 { return Ok(0); } Err(anyhow::anyhow!("invalid address")) }; - let mut removed_txs = transaction_container + let removed_txs = pending_txs .clean_accounts(¤t_account_nonce_getter) - .await - .unwrap(); + .await; assert_eq!( removed_txs.len(), 2, "two transactions should've been popped" ); + assert_eq!(pending_txs.txs.len(), 1, "empty accounts should be removed"); assert_eq!( - transaction_container.accounts.len(), - 1, - "empty accounts should be removed" - ); - assert_eq!( - transaction_container.size(), + pending_txs.len(), 1, "1 transaction should be remaining from original 3" ); - assert_eq!( - transaction_container - .accounts - .get(&signing_address_1) - .unwrap() - .size(), - 1, - "not expired account should have expected transactions" + assert!( + pending_txs.contains_tx(&ttx_s1_0.tx_hash), + "not expired account should be untouched" ); // check removal reasons - let first_pop = removed_txs.pop().expect("should have tx to pop"); assert_eq!( - first_pop.0.tx_hash(), - ttx_s0_1.tx_hash(), - "first pop should be last pushed tx, which should be the second tx" - ); - assert!( - matches!(first_pop.1, RemovalReason::LowerNonceInvalidated), - "first transaction's removal reason should be lower nonce invalidation" + removed_txs[0], + (ttx_s0_0.tx_hash, RemovalReason::Expired), + "first should be first pushed tx with removal reason as expired" ); - let second_pop = removed_txs.pop().expect("should have another tx to pop"); assert_eq!( - second_pop.0.tx_hash(), - ttx_s0_0.tx_hash(), - "second pop should be first added tx, to verify for next check" + removed_txs[1], + (ttx_s0_1.tx_hash, RemovalReason::LowerNonceInvalidated), + "second should be second added tx with removal reason as lower nonce invalidation" ); + } + + #[test] + fn pending_transactions_pending_nonce() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add for account 0 + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + + pending_txs.add(ttx_s0_0, 0).unwrap(); + pending_txs.add(ttx_s0_1, 0).unwrap(); + + // empty account returns zero assert!( - matches!(second_pop.1, RemovalReason::Expired), - "first transaction's removal reason should be expiration" + pending_txs.pending_nonce(signing_address_1).is_none(), + "empty account should return None" + ); + + // non empty account returns highest nonce + assert_eq!( + pending_txs.pending_nonce(signing_address_0), + Some(1), + "should return highest nonce" ); } #[tokio::test] - async fn transaction_container_find_promotables() { - let mut transaction_container = - TransactionContainer::new(false, UNSTRICT_SIZE_LIMIT, Duration::from_secs(2)); + async fn pending_transactions_builder_queue() { + let mut pending_txs = PendingTransactions::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); + let signing_address_1 = signing_key_1.address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [1; 32], - ))); - let ttx_s0_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_0, - [2; 32], - ))); - let ttx_s0_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &signing_key_0, - [2; 32], - ))); - let ttx_s1_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_4 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 4, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s1_3 = mock_ttx(3, &signing_key_1); // add transactions - transaction_container.add(ttx_s0_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_2.clone(), 0).unwrap(); - transaction_container.add(ttx_s0_3.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_2.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_4.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 1).unwrap(); + pending_txs.add(ttx_s1_1.clone(), 1).unwrap(); + pending_txs.add(ttx_s1_2.clone(), 1).unwrap(); + pending_txs.add(ttx_s1_3.clone(), 1).unwrap(); // current nonce getter - // should pop all from signing_address_0 and two from signing_address_1 + // should return all transactions from signing_key_0 and last two from signing_key_1 let current_account_nonce_getter = |address: [u8; 20]| async move { if address == signing_address_0 { - return Ok(1); - } - if address == signing_address_1 { - return Ok(1); + Ok(1) + } else if address == signing_address_1 { + Ok(2) + } else { + Err(anyhow::anyhow!("invalid address")) } - Err(anyhow::anyhow!("invalid address")) }; + // get builder queue + let builder_queue = pending_txs + .builder_queue(¤t_account_nonce_getter) + .await + .expect("building builders queue should work"); assert_eq!( - transaction_container - .find_promotables(¤t_account_nonce_getter) - .await - .expect("find promotables should work") - .len(), - 5, - "five transactions should've been popped" + builder_queue.len(), + 3, + "three transactions should've been popped" ); + + // check that the transactions are in the expected order + let (first_tx_hash, _) = builder_queue[0]; assert_eq!( - transaction_container.accounts.len(), - 1, - "empty accounts should be removed" + first_tx_hash, ttx_s0_1.tx_hash, + "expected earliest transaction with lowest nonce difference (0) to be first" ); + let (second_tx_hash, _) = builder_queue[1]; assert_eq!( - transaction_container.size(), - 1, - "1 transactions should be remaining from original 6" + second_tx_hash, ttx_s1_2.tx_hash, + "expected other low nonce diff (0) to be second" ); + let (third_tx_hash, _) = builder_queue[2]; assert_eq!( - transaction_container - .accounts - .get(&signing_address_1) - .unwrap() - .size(), - 1 + third_tx_hash, ttx_s1_3.tx_hash, + "expected highest nonce diff to be last" + ); + + // ensure transactions not removed + assert_eq!( + pending_txs.len(), + 4, + "no transactions should've been removed" ); } #[tokio::test] - async fn transaction_container_pop_front_account() { - let mut transaction_container = - TransactionContainer::new(false, UNSTRICT_SIZE_LIMIT, Duration::from_secs(2)); + async fn parked_transactions_pop_front_account() { + let mut parked_txs = ParkedTransactions::::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); + let signing_address_1 = signing_key_1.address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [1; 32], - ))); - let ttx_s1_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_4 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 4, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s1_4 = mock_ttx(4, &signing_key_1); // add transactions - transaction_container.add(ttx_s0_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_1.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_2.clone(), 0).unwrap(); - transaction_container.add(ttx_s1_4.clone(), 0).unwrap(); + parked_txs.add(ttx_s0_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_2.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_4.clone(), 0).unwrap(); // pop from account 1 assert_eq!( - transaction_container - .pop_front_account(&signing_address_0, 1) - .len(), + parked_txs.pop_front_account(&signing_address_0, 1).len(), 1, "one transactions should've been popped" ); - assert_eq!( - transaction_container.accounts.len(), - 1, - "empty accounts should be removed" - ); + assert_eq!(parked_txs.txs.len(), 1, "empty accounts should be removed"); // pop from account 2 assert_eq!( - transaction_container - .pop_front_account(&signing_address_1, 1) - .len(), + parked_txs.pop_front_account(&signing_address_1, 1).len(), 2, "two transactions should've been popped" ); assert_eq!( - transaction_container.accounts.len(), + parked_txs.txs.len(), 1, "non empty accounts should not be removed" ); assert_eq!( - transaction_container.size(), + parked_txs.len(), 1, "1 transactions should be remaining from original 4" ); + assert!(parked_txs.contains_tx(&ttx_s1_4.tx_hash)); } #[tokio::test] - async fn transaction_container_builder_queue() { - let mut transaction_container = - TransactionContainer::new(true, STRICT_SIZE_LIMIT, Duration::from_secs(2)); + async fn parked_transactions_find_promotables() { + let mut parked_txs = ParkedTransactions::::new(TX_TTL); let signing_key_0 = SigningKey::from([1; 32]); - let signing_address_0 = signing_key_0.clone().verification_key().address_bytes(); + let signing_address_0 = signing_key_0.address_bytes(); let signing_key_1 = SigningKey::from([2; 32]); - let signing_address_1 = signing_key_1.clone().verification_key().address_bytes(); + let signing_address_1 = signing_key_1.address_bytes(); // transactions to add to accounts - // account, nonce, hash - let ttx_s0_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_0, - [1; 32], - ))); - let ttx_s1_1 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 1, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_2 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 2, - &signing_key_1, - [1; 32], - ))); - let ttx_s1_3 = Arc::new(TimemarkedTransaction::new(get_mock_tx_parameterized( - 3, - &signing_key_1, - [1; 32], - ))); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s0_2 = mock_ttx(2, &signing_key_0); + let ttx_s0_3 = mock_ttx(3, &signing_key_0); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s1_4 = mock_ttx(4, &signing_key_1); // add transactions - transaction_container.add(ttx_s0_1.clone(), 1).unwrap(); - transaction_container.add(ttx_s1_1.clone(), 1).unwrap(); - transaction_container.add(ttx_s1_2.clone(), 1).unwrap(); - transaction_container.add(ttx_s1_3.clone(), 1).unwrap(); + parked_txs.add(ttx_s0_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s0_2.clone(), 0).unwrap(); + parked_txs.add(ttx_s0_3.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_2.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_4.clone(), 0).unwrap(); // current nonce getter - // should return all transactions from signing_key_0 and last two from signing_key_1 + // should pop all from signing_address_0 and two from signing_address_1 let current_account_nonce_getter = |address: [u8; 20]| async move { - if address == signing_address_0 { + if address == signing_address_0 || address == signing_address_1 { return Ok(1); } - if address == signing_address_1 { - return Ok(2); - } Err(anyhow::anyhow!("invalid address")) }; - // get builder queue - let mut builder_queue = transaction_container - .builder_queue(¤t_account_nonce_getter) - .await - .expect("building builders queue should work"); - assert_eq!( - builder_queue.len(), - 3, - "three transactions should've been popped" - ); - - // check that the transactions are in the expected order - let (first_tx, _) = builder_queue.pop().unwrap(); assert_eq!( - first_tx.address(), - &signing_address_0, - "expected earliest transaction with lowest nonce difference to be first" - ); - let (second_tx, _) = builder_queue.pop().unwrap(); - assert_eq!( - second_tx.address(), - &signing_address_1, - "expected lower nonce diff to be second" - ); - assert_eq!(second_tx.signed_tx().nonce(), 2); - let (third_tx, _) = builder_queue.pop().unwrap(); - assert_eq!( - third_tx.address(), - &signing_address_1, - "expected highest nonce diff to be last" + parked_txs + .find_promotables(¤t_account_nonce_getter) + .await + .len(), + 5, + "five transactions should've been popped" ); - assert_eq!(third_tx.signed_tx().nonce(), 3); - - // ensure transactions not removed + assert_eq!(parked_txs.txs.len(), 1, "empty accounts should be removed"); assert_eq!( - transaction_container.size(), - 4, - "no transactions should've been removed" + parked_txs.len(), + 1, + "1 transactions should be remaining from original 6" ); + assert!(parked_txs.contains_tx(&ttx_s1_4.tx_hash)); } } diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index b9da05020e..013762155a 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -202,6 +202,7 @@ mod test { use std::{ collections::HashMap, str::FromStr, + sync::Arc, }; use astria_core::{ @@ -287,12 +288,12 @@ mod test { let (mut consensus_service, mempool) = new_consensus_service(Some(signing_key.verification_key())).await; let tx = make_unsigned_tx(); - let signed_tx = tx.into_signed(&signing_key); - let tx_bytes = signed_tx.clone().into_raw().encode_to_vec(); + let signed_tx = Arc::new(tx.into_signed(&signing_key)); + let tx_bytes = signed_tx.to_raw().encode_to_vec(); let txs = vec![tx_bytes.into()]; mempool.insert(signed_tx.clone(), 0).await.unwrap(); - let res = generate_rollup_datas_commitment(&vec![signed_tx], HashMap::new()); + let res = generate_rollup_datas_commitment(&vec![(*signed_tx).clone()], HashMap::new()); let prepare_proposal = new_prepare_proposal_request(); let prepare_proposal_response = consensus_service @@ -492,10 +493,10 @@ mod test { new_consensus_service(Some(signing_key.verification_key())).await; let tx = make_unsigned_tx(); - let signed_tx = tx.into_signed(&signing_key); - let tx_bytes = signed_tx.clone().into_raw().encode_to_vec(); + let signed_tx = Arc::new(tx.into_signed(&signing_key)); + let tx_bytes = signed_tx.to_raw().encode_to_vec(); let txs = vec![tx_bytes.clone().into()]; - let res = generate_rollup_datas_commitment(&vec![signed_tx.clone()], HashMap::new()); + let res = generate_rollup_datas_commitment(&vec![(*signed_tx).clone()], HashMap::new()); let block_data = res.into_transactions(txs.clone()); let data_hash = diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index 1f315483fe..ffe740ed7f 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -1,5 +1,6 @@ use std::{ pin::Pin, + sync::Arc, task::{ Context, Poll, @@ -167,7 +168,10 @@ async fn handle_check_tx