diff --git a/Cargo.lock b/Cargo.lock index d6bea69d74..dfd9a0a4b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -802,7 +802,6 @@ dependencies = [ "penumbra-ibc", "penumbra-proto", "penumbra-tower-trace", - "priority-queue", "prost", "rand 0.8.5", "rand_chacha 0.3.1", @@ -5947,17 +5946,6 @@ dependencies = [ "uint", ] -[[package]] -name = "priority-queue" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "509354d8a769e8d0b567d6821b84495c60213162761a732d68ce87c964bd347f" -dependencies = [ - "autocfg", - "equivalent", - "indexmap 2.2.6", -] - [[package]] name = "proc-macro-crate" version = "1.3.1" diff --git a/crates/astria-core/src/protocol/abci.rs b/crates/astria-core/src/protocol/abci.rs index 6f0d711e63..c70c9165dd 100644 --- a/crates/astria-core/src/protocol/abci.rs +++ b/crates/astria-core/src/protocol/abci.rs @@ -16,7 +16,9 @@ impl AbciErrorCode { pub const VALUE_NOT_FOUND: Self = Self(unsafe { NonZeroU32::new_unchecked(8) }); pub const TRANSACTION_EXPIRED: Self = Self(unsafe { NonZeroU32::new_unchecked(9) }); pub const TRANSACTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(10) }); - pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(11) }); + pub const TRANSACTION_INSERTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(11) }); + pub const LOWER_NONCE_INVALIDATED: Self = Self(unsafe { NonZeroU32::new_unchecked(12) }); + pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(13) }); } impl AbciErrorCode { @@ -42,6 +44,10 @@ impl AbciErrorCode { Self::TRANSACTION_FAILED => { "the transaction failed to execute in prepare_proposal()".into() } + Self::TRANSACTION_INSERTION_FAILED => { + "the transaction failed insertion into the mempool".into() + } + Self::LOWER_NONCE_INVALIDATED => "lower nonce was invalidated in mempool".into(), Self::BAD_REQUEST => "the request payload was malformed".into(), Self(other) => { format!("invalid error code {other}: should be unreachable (this is a bug)") diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index d487930dcb..3647d34af5 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -30,7 +30,6 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.7 ] } ibc-proto = { version = "0.41.0", features = ["server"] } matchit = "0.7.2" -priority-queue = "2.0.2" tower = "0.4" tower-abci = "0.12.0" tower-actor = "0.1.0" diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 13ec8393c4..522457ecd0 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -63,9 +63,10 @@ use tracing::{ }; use crate::{ - accounts, accounts::{ + self, component::AccountsComponent, + StateReadExt, StateWriteExt as _, }, address::StateWriteExt as _, @@ -484,11 +485,21 @@ impl App { let mut included_signed_txs = Vec::new(); let mut failed_tx_count: usize = 0; let mut execution_results = Vec::new(); - let mut txs_to_readd_to_mempool = Vec::new(); + let mut excluded_txs: usize = 0; + + // get copy of transactions to execute from mempool + let current_account_nonce_getter = + |address: [u8; 20]| self.state.get_account_nonce(address); + let pending_txs = self + .mempool + .builder_queue(current_account_nonce_getter) + .await + .expect("failed to fetch pending transactions"); - while let Some((enqueued_tx, priority)) = self.mempool.pop().await { - let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string(); - let tx = enqueued_tx.signed_tx(); + let mut unused_count = pending_txs.len(); + for (tx_hash, tx) in pending_txs { + unused_count = unused_count.saturating_sub(1); + let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string(); let bytes = tx.to_raw().encode_to_vec(); let tx_len = bytes.len(); info!(transaction_hash = %tx_hash_base64, "executing transaction"); @@ -503,7 +514,7 @@ impl App { tx_data_bytes = tx_len, "excluding remaining transactions: max cometBFT data limit reached" ); - txs_to_readd_to_mempool.push((enqueued_tx, priority)); + excluded_txs = excluded_txs.saturating_add(1); // break from loop, as the block is full break; @@ -526,7 +537,7 @@ impl App { tx_data_bytes = tx_sequence_data_bytes, "excluding transaction: max block sequenced data limit reached" ); - txs_to_readd_to_mempool.push((enqueued_tx, priority)); + excluded_txs = excluded_txs.saturating_add(1); // continue as there might be non-sequence txs that can fit continue; @@ -558,18 +569,21 @@ impl App { ); if e.downcast_ref::().is_some() { - // we re-insert the tx into the mempool if it failed to execute + // we don't remove the tx from mempool if it failed to execute // due to an invalid nonce, as it may be valid in the future. // if it's invalid due to the nonce being too low, it'll be // removed from the mempool in `update_mempool_after_finalization`. - txs_to_readd_to_mempool.push((enqueued_tx, priority)); } else { failed_tx_count = failed_tx_count.saturating_add(1); - // the transaction should be removed from the cometbft mempool + // remove the failing transaction from the mempool + // + // this will remove any transactions from the same sender + // as well, as the dependent nonces will not be able + // to execute self.mempool - .track_removal_comet_bft( - enqueued_tx.tx_hash(), + .remove_tx_invalid( + tx, RemovalReason::FailedPrepareProposal(e.to_string()), ) .await; @@ -586,15 +600,12 @@ impl App { ); } self.metrics.set_prepare_proposal_excluded_transactions( - txs_to_readd_to_mempool - .len() - .saturating_add(failed_tx_count), + excluded_txs.saturating_add(failed_tx_count), ); - self.mempool.insert_all(txs_to_readd_to_mempool).await; - let mempool_len = self.mempool.len().await; - debug!(mempool_len, "finished executing transactions from mempool"); - self.metrics.set_transactions_in_mempool_total(mempool_len); + debug!("{unused_count} leftover pending transactions"); + self.metrics + .set_transactions_in_mempool_total(self.mempool.len().await); self.execution_results = Some(execution_results); Ok((validated_txs, included_signed_txs)) @@ -805,10 +816,6 @@ impl App { // skip the first two transactions, as they are the rollup data commitments for tx in finalize_block.txs.iter().skip(2) { - // remove any included txs from the mempool - let tx_hash = Sha256::digest(tx).into(); - self.mempool.remove(tx_hash).await; - let signed_tx = signed_transaction_from_bytes(tx) .context("protocol error; only valid txs should be finalized")?; @@ -880,6 +887,10 @@ impl App { state_tx .put_sequencer_block(sequencer_block) .context("failed to write sequencer block to state")?; + + // update the priority of any txs in the mempool based on the updated app state + update_mempool_after_finalization(&mut self.mempool, &state_tx).await; + // events that occur after end_block are ignored here; // there should be none anyways. let _ = self.apply(state_tx); @@ -890,11 +901,6 @@ impl App { .await .context("failed to prepare commit")?; - // update the priority of any txs in the mempool based on the updated app state - update_mempool_after_finalization(&mut self.mempool, self.state.clone()) - .await - .context("failed to update mempool after finalization")?; - Ok(abci::response::FinalizeBlock { events: end_block.events, validator_updates: end_block.validator_updates, @@ -1121,10 +1127,10 @@ impl App { // the mempool is large. async fn update_mempool_after_finalization( mempool: &mut Mempool, - state: S, -) -> anyhow::Result<()> { + state: &S, +) { let current_account_nonce_getter = |address: [u8; 20]| state.get_account_nonce(address); - mempool.run_maintenance(current_account_nonce_getter).await + mempool.run_maintenance(current_account_nonce_getter).await; } /// relevant data of a block being executed. diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index 09b3571a0a..ebb1e9eabb 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use astria_core::{ crypto::SigningKey, primitive::v1::RollupId, @@ -138,7 +140,11 @@ pub(crate) async fn initialize_app( app } -pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction { +pub(crate) fn mock_tx( + nonce: u32, + signer: &SigningKey, + rollup_name: &str, +) -> Arc { let tx = UnsignedTransaction { params: TransactionParams::builder() .nonce(nonce) @@ -146,7 +152,7 @@ pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction { .build(), actions: vec![ SequenceAction { - rollup_id: RollupId::from_unhashed_bytes([0; 32]), + rollup_id: RollupId::from_unhashed_bytes(rollup_name.as_bytes()), data: Bytes::from_static(&[0x99]), fee_asset: "astria".parse().unwrap(), } @@ -154,5 +160,5 @@ pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction { ], }; - tx.into_signed(&get_alice_signing_key()) + Arc::new(tx.into_signed(signer)) } diff --git a/crates/astria-sequencer/src/app/tests_app.rs b/crates/astria-sequencer/src/app/tests_app.rs index 4e6b6dbfe6..58ec904b17 100644 --- a/crates/astria-sequencer/src/app/tests_app.rs +++ b/crates/astria-sequencer/src/app/tests_app.rs @@ -452,7 +452,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() { // don't commit the result, now call prepare_proposal with the same data. // this will reset the app state. // this simulates executing the same block as a validator (specifically the proposer). - app.mempool.insert(signed_tx, 0).await.unwrap(); + app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap(); let proposer_address = [88u8; 20].to_vec().try_into().unwrap(); let prepare_proposal = PrepareProposal { @@ -473,6 +473,12 @@ async fn app_execution_results_match_proposal_vs_after_proposal() { assert_eq!(prepare_proposal_result.txs, finalize_block.txs); assert_eq!(app.executed_proposal_hash, Hash::default()); assert_eq!(app.validator_address.unwrap(), proposer_address); + // run maintence to clear out transactions + let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address); + app.mempool + .run_maintenance(current_account_nonce_getter) + .await; + assert_eq!(app.mempool.len().await, 0); // call process_proposal - should not re-execute anything. @@ -561,8 +567,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { } .into_signed(&alice); - app.mempool.insert(tx_pass, 0).await.unwrap(); - app.mempool.insert(tx_overflow, 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -581,6 +587,12 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { .await .expect("too large transactions should not cause prepare proposal to fail"); + // run maintence to clear out transactions + let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address); + app.mempool + .run_maintenance(current_account_nonce_getter) + .await; + // see only first tx made it in assert_eq!( result.txs.len(), @@ -634,8 +646,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { } .into_signed(&alice); - app.mempool.insert(tx_pass, 0).await.unwrap(); - app.mempool.insert(tx_overflow, 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -654,6 +666,12 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { .await .expect("too large transactions should not cause prepare proposal to fail"); + // run maintence to clear out transactions + let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address); + app.mempool + .run_maintenance(current_account_nonce_getter) + .await; + // see only first tx made it in assert_eq!( result.txs.len(), diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index 44c941f172..f4c89ff392 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -264,13 +264,20 @@ mod test { let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); - let nonce = 99; - let tx = crate::app::test_utils::get_mock_tx(nonce); + // insert a transaction with a nonce gap + let gapped_nonce = 99; + let tx = crate::app::test_utils::mock_tx(gapped_nonce, &get_alice_signing_key(), "test"); mempool.insert(tx, 0).await.unwrap(); - // insert a tx with lower nonce also, but we should get the highest nonce - let lower_nonce = 98; - let tx = crate::app::test_utils::get_mock_tx(lower_nonce); + // insert a transaction at the current nonce + let account_nonce = 0; + let tx = crate::app::test_utils::mock_tx(account_nonce, &get_alice_signing_key(), "test"); + mempool.insert(tx, 0).await.unwrap(); + + // insert a transactions one above account nonce (not gapped) + let sequential_nonce = 1; + let tx: Arc = + crate::app::test_utils::mock_tx(sequential_nonce, &get_alice_signing_key(), "test"); mempool.insert(tx, 0).await.unwrap(); let server = Arc::new(SequencerServer::new(storage.clone(), mempool)); @@ -279,7 +286,7 @@ mod test { }; let request = Request::new(request); let response = server.get_pending_nonce(request).await.unwrap(); - assert_eq!(response.into_inner().inner, nonce); + assert_eq!(response.into_inner().inner, sequential_nonce); } #[tokio::test] diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs index dacf377f00..d8dbb7e878 100644 --- a/crates/astria-sequencer/src/mempool/benchmarks.rs +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -2,7 +2,10 @@ use std::{ collections::HashMap, - sync::OnceLock, + sync::{ + Arc, + OnceLock, + }, time::Duration, }; @@ -25,7 +28,6 @@ use astria_core::{ UnsignedTransaction, }, }; -use bytes::Bytes; use sha2::{ Digest as _, Sha256, @@ -57,8 +59,8 @@ fn signing_keys() -> impl Iterator { } /// Returns a static ref to a collection of `MAX_INITIAL_TXS + 1` transactions. -fn transactions() -> &'static Vec { - static TXS: OnceLock> = OnceLock::new(); +fn transactions() -> &'static Vec> { + static TXS: OnceLock>> = OnceLock::new(); TXS.get_or_init(|| { let mut nonces_and_chain_ids = HashMap::new(); signing_keys() @@ -76,14 +78,15 @@ fn transactions() -> &'static Vec { .build(); let sequence_action = SequenceAction { rollup_id: RollupId::new([1; 32]), - data: Bytes::from_static(&[2; 1000]), + data: vec![2; 1000].into(), fee_asset: Denom::IbcPrefixed(IbcPrefixed::new([3; 32])), }; - UnsignedTransaction { + let tx = UnsignedTransaction { actions: vec![Action::Sequence(sequence_action)], params, } - .into_signed(signing_key) + .into_signed(signing_key); + Arc::new(tx) }) .take(MAX_INITIAL_TXS + 1) .collect() @@ -161,8 +164,10 @@ fn init_mempool() -> Mempool { for i in 0..super::REMOVAL_CACHE_SIZE { let hash = Sha256::digest(i.to_le_bytes()).into(); mempool - .track_removal_comet_bft(hash, RemovalReason::Expired) - .await; + .comet_bft_removal_cache + .write() + .await + .add(hash, RemovalReason::Expired); } }); mempool @@ -170,7 +175,7 @@ fn init_mempool() -> Mempool { /// Returns the first transaction from the static `transactions()` not included in the initialized /// mempool, i.e. the one at index `T::size()`. -fn get_unused_tx() -> SignedTransaction { +fn get_unused_tx() -> Arc { transactions().get(T::checked_size()).unwrap().clone() } @@ -199,7 +204,9 @@ fn insert(bencher: divan::Bencher) { }); } -/// Benchmarks `Mempool::pop` on a mempool with the given number of existing entries. +/// Benchmarks `Mempool::builder_queue` on a mempool with the given number of existing entries. +/// +/// Note: this benchmark doesn't capture the nuances of dealing with parked vs pending transactions. #[divan::bench( max_time = MAX_TIME, types = [ @@ -209,22 +216,30 @@ fn insert(bencher: divan::Bencher) { mempool_with_100000_txs ] )] -fn pop(bencher: divan::Bencher) { +fn builder_queue(bencher: divan::Bencher) { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); + let mocked_current_account_nonce_getter = |_: [u8; 20]| async move { Ok(0_u32) }; bencher .with_inputs(|| init_mempool::()) .bench_values(move |mempool| { runtime.block_on(async { - mempool.pop().await.unwrap(); + mempool + .builder_queue(mocked_current_account_nonce_getter) + .await + .unwrap(); }); }); } -/// Benchmarks `Mempool::remove` for a single transaction on a mempool with the given number of -/// existing entries. +/// Benchmarks `Mempool::remove_tx_invalid` for a single transaction on a mempool with the given +/// number of existing entries. +/// +/// Note about this benchmark: `remove_tx_invalid()` will remove all higher nonces. To keep this +/// benchmark comparable with the previous mempool, we're removing the highest nonce. In the future +/// it would be better to have this bench remove the midpoint. #[divan::bench( max_time = MAX_TIME, types = [ @@ -234,42 +249,23 @@ fn pop(bencher: divan::Bencher) { mempool_with_100000_txs ] )] -fn remove(bencher: divan::Bencher) { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - bencher - .with_inputs(|| { - let tx_hash = transactions().first().unwrap().sha256_of_proto_encoding(); - (init_mempool::(), tx_hash) - }) - .bench_values(move |(mempool, tx_hash)| { - runtime.block_on(async { - mempool.remove(tx_hash).await; - }); - }); -} - -/// Benchmarks `Mempool::track_removal_comet_bft` for a single new transaction on a mempool with -/// the `comet_bft_removal_cache` filled. -/// -/// Note that the number of entries in the main cache is irrelevant here. -#[divan::bench(max_time = MAX_TIME)] -fn track_removal_comet_bft(bencher: divan::Bencher) { +fn remove_tx_invalid(bencher: divan::Bencher) { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); bencher .with_inputs(|| { - let tx_hash = transactions().first().unwrap().sha256_of_proto_encoding(); - (init_mempool::(), tx_hash) + let signed_tx = transactions() + .get(T::checked_size().saturating_sub(1)) + .cloned() + .unwrap(); + (init_mempool::(), signed_tx) }) - .bench_values(move |(mempool, tx_hash)| { + .bench_values(move |(mempool, signed_tx)| { runtime.block_on(async { mempool - .track_removal_comet_bft(tx_hash, RemovalReason::Expired) + .remove_tx_invalid(signed_tx, RemovalReason::Expired) .await; }); }); @@ -313,7 +309,7 @@ fn run_maintenance(bencher: divan::Bencher) { .build() .unwrap(); // Set the new nonce so that the entire `REMOVAL_CACHE_SIZE` entries in the - // `comet_bft_removal_cache` are replaced (assuming this test case has enough txs). + // `comet_bft_removal_cache` are filled (assuming this test case has enough txs). // allow: this is test-only code, using small values, and where the result is not critical. #[allow(clippy::arithmetic_side_effects, clippy::cast_possible_truncation)] let new_nonce = (super::REMOVAL_CACHE_SIZE as u32 / u32::from(SIGNER_COUNT)) + 1; @@ -325,10 +321,7 @@ fn run_maintenance(bencher: divan::Bencher) { .with_inputs(|| init_mempool::()) .bench_values(move |mempool| { runtime.block_on(async { - mempool - .run_maintenance(current_account_nonce_getter) - .await - .unwrap(); + mempool.run_maintenance(current_account_nonce_getter).await; }); }); } diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index 28dc93b1e0..1106d74bef 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -1,154 +1,58 @@ mod benchmarks; +mod transactions_container; use std::{ - cmp::{ - self, - Ordering, - }, collections::{ HashMap, VecDeque, }, future::Future, num::NonZeroUsize, - sync::{ - Arc, - OnceLock, - }, + sync::Arc, }; -use anyhow::Context; -use astria_core::{ - crypto::SigningKey, - primitive::v1::ADDRESS_LEN, - protocol::transaction::v1alpha1::{ - SignedTransaction, - TransactionParams, - UnsignedTransaction, - }, -}; -use priority_queue::PriorityQueue; +use astria_core::protocol::transaction::v1alpha1::SignedTransaction; use tokio::{ - sync::RwLock, - time::{ - Duration, - Instant, + join, + sync::{ + RwLock, + RwLockWriteGuard, }, + time::Duration, }; use tracing::{ - debug, + error, instrument, }; +pub(crate) use transactions_container::InsertionError; +use transactions_container::{ + ParkedTransactions, + PendingTransactions, + TimemarkedTransaction, +}; -type MempoolQueue = PriorityQueue; - -/// Used to prioritize transactions in the mempool. -/// -/// The priority is calculated as the difference between the transaction nonce and the current -/// account nonce. The lower the difference, the higher the priority. -#[derive(Clone, Debug)] -pub(crate) struct TransactionPriority { - nonce_diff: u32, - time_first_seen: Instant, -} - -impl PartialEq for TransactionPriority { - fn eq(&self, other: &Self) -> bool { - self.nonce_diff == other.nonce_diff - } -} - -impl Eq for TransactionPriority {} - -impl Ord for TransactionPriority { - fn cmp(&self, other: &Self) -> Ordering { - // we want to execute the lowest nonce first, - // so lower nonce difference means higher priority - self.nonce_diff.cmp(&other.nonce_diff).reverse() - } -} - -impl PartialOrd for TransactionPriority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -#[derive(Clone, Debug)] -pub(crate) struct EnqueuedTransaction { - tx_hash: [u8; 32], - signed_tx: Arc, -} - -impl EnqueuedTransaction { - fn new(signed_tx: SignedTransaction) -> Self { - Self { - tx_hash: signed_tx.sha256_of_proto_encoding(), - signed_tx: Arc::new(signed_tx), - } - } - - fn priority( - &self, - current_account_nonce: u32, - time_first_seen: Option, - ) -> anyhow::Result { - let Some(nonce_diff) = self.signed_tx.nonce().checked_sub(current_account_nonce) else { - return Err(anyhow::anyhow!( - "transaction nonce {} is less than current account nonce {current_account_nonce}", - self.signed_tx.nonce() - )); - }; - - Ok(TransactionPriority { - nonce_diff, - time_first_seen: time_first_seen.unwrap_or(Instant::now()), - }) - } - - pub(crate) fn tx_hash(&self) -> [u8; 32] { - self.tx_hash - } - - pub(crate) fn signed_tx(&self) -> Arc { - self.signed_tx.clone() - } - - pub(crate) fn address_bytes(&self) -> [u8; 20] { - self.signed_tx.address_bytes() - } -} - -/// Only consider `self.tx_hash` for equality. This is consistent with the impl for std `Hash`. -impl PartialEq for EnqueuedTransaction { - fn eq(&self, other: &Self) -> bool { - self.tx_hash == other.tx_hash - } -} - -impl Eq for EnqueuedTransaction {} - -/// Only consider `self.tx_hash` when hashing. This is consistent with the impl for equality. -impl std::hash::Hash for EnqueuedTransaction { - fn hash(&self, state: &mut H) { - self.tx_hash.hash(state); - } -} - -#[derive(Debug, Clone)] +#[derive(Debug, Eq, PartialEq, Clone)] pub(crate) enum RemovalReason { Expired, + NonceStale, + LowerNonceInvalidated, FailedPrepareProposal(String), + FailedCheckTx(String), } -const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes +/// How long transactions are considered valid in the mempool. +const TX_TTL: Duration = Duration::from_secs(240); +/// Max number of parked transactions allowed per account. +const MAX_PARKED_TXS_PER_ACCOUNT: usize = 15; +/// Max number of transactions to keep in the removal cache. Should be larger than the max number of +/// transactions allowed in the cometBFT mempool. const REMOVAL_CACHE_SIZE: usize = 4096; /// `RemovalCache` is used to signal to `CometBFT` that a /// transaction can be removed from the `CometBFT` mempool. /// -/// This is useful for when a transaction fails execution or when a transaction -/// has expired in the app's mempool. +/// This is useful for when a transaction fails execution or when +/// a transaction is invalidated due to mempool removal policies. #[derive(Clone)] pub(crate) struct RemovalCache { cache: HashMap<[u8; 32], RemovalReason>, @@ -165,13 +69,14 @@ impl RemovalCache { } } - /// returns Some(RemovalReason) if transaction is cached and - /// removes the entry from the cache at the same time + /// Returns Some(RemovalReason) if the transaction is cached and + /// removes the entry from the cache if present. fn remove(&mut self, tx_hash: [u8; 32]) -> Option { self.cache.remove(&tx_hash) } - /// adds the transaction to the cache + /// Adds the transaction to the cache, will preserve the original + /// `RemovalReason` if already in the cache. fn add(&mut self, tx_hash: [u8; 32], reason: RemovalReason) { if self.cache.contains_key(&tx_hash) { return; @@ -191,590 +96,510 @@ impl RemovalCache { } } -/// [`Mempool`] is an internally-synchronized wrapper around a prioritized queue of transactions -/// awaiting execution. +/// [`Mempool`] is an account-based structure for maintaining transactions for execution. +/// +/// The transactions are split between pending and parked, where pending transactions are ready for +/// execution and parked transactions could be executable in the future. /// -/// The priority is calculated as the difference between the transaction nonce and the current -/// account nonce. The lower the difference, the higher the priority. +/// The mempool exposes the pending transactions through `builder_queue()`, which returns a copy of +/// all pending transactions sorted in the order in which they should be executed. The sort order +/// is firstly by the difference between the transaction nonce and the account's current nonce +/// (ascending), and then by time first seen (ascending). +/// +/// The mempool implements the following policies: +/// 1. Nonce replacement is not allowed. +/// 2. Accounts cannot have more than `MAX_PARKED_TXS_PER_ACCOUNT` transactions in their parked +/// queues. +/// 3. There is no account limit on pending transactions. +/// 4. Transactions will expire and can be removed after `TX_TTL` time. +/// 5. If an account has a transaction removed for being invalid or expired, all transactions for +/// that account with a higher nonce will be removed as well. This is due to the fact that we do +/// not execute failing transactions, so a transaction 'failing' will mean that further account +/// nonces will not be able to execute either. /// /// Future extensions to this mempool can include: /// - maximum mempool size -/// - fee-based ordering -/// - transaction expiration +/// - account balance aware pending queue #[derive(Clone)] pub(crate) struct Mempool { - queue: Arc>, + pending: Arc>, + parked: Arc>>, comet_bft_removal_cache: Arc>, - tx_ttl: Duration, } impl Mempool { #[must_use] pub(crate) fn new() -> Self { Self { - queue: Arc::new(RwLock::new(MempoolQueue::new())), + pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))), + parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))), comet_bft_removal_cache: Arc::new(RwLock::new(RemovalCache::new( NonZeroUsize::try_from(REMOVAL_CACHE_SIZE) .expect("Removal cache cannot be zero sized"), ))), - tx_ttl: TX_TTL, } } - /// returns the number of transactions in the mempool + /// Returns the number of transactions in the mempool. #[must_use] #[instrument(skip_all)] pub(crate) async fn len(&self) -> usize { - self.queue.read().await.len() + #[rustfmt::skip] + let (pending_len, parked_len) = join!( + async { self.pending.read().await.len() }, + async { self.parked.read().await.len() } + ); + pending_len.saturating_add(parked_len) } - /// inserts a transaction into the mempool - /// - /// note: the oldest timestamp from found priorities is maintained. + /// Inserts a transaction into the mempool and does not allow for transaction replacement. + /// Will return the reason for insertion failure if failure occurs. #[instrument(skip_all)] pub(crate) async fn insert( &self, - tx: SignedTransaction, + tx: Arc, current_account_nonce: u32, - ) -> anyhow::Result<()> { - let enqueued_tx = EnqueuedTransaction::new(tx); - let fresh_priority = enqueued_tx.priority(current_account_nonce, None)?; - Self::update_or_insert(&mut *self.queue.write().await, enqueued_tx, &fresh_priority); - - Ok(()) + ) -> anyhow::Result<(), InsertionError> { + let timemarked_tx = TimemarkedTransaction::new(tx); + + let (mut pending, mut parked) = self.acquire_both_locks().await; + + // try insert into pending (will fail if nonce is gapped or already present) + match pending.add(timemarked_tx.clone(), current_account_nonce) { + Err(InsertionError::NonceGap) => { + // Release the lock asap. + drop(pending); + // try to add to parked queue + parked.add(timemarked_tx, current_account_nonce) + } + error @ Err( + InsertionError::AlreadyPresent + | InsertionError::NonceTooLow + | InsertionError::NonceTaken + | InsertionError::AccountSizeLimit, + ) => error, + Ok(()) => { + // check parked for txs able to be promoted + let to_promote = parked.pop_front_account( + timemarked_tx.address(), + timemarked_tx + .nonce() + .checked_add(1) + .expect("failed to increment nonce in promotion"), + ); + // Release the lock asap. + drop(parked); + for ttx in to_promote { + if let Err(error) = pending.add(ttx, current_account_nonce) { + error!( + current_account_nonce, + "failed to promote transaction during insertion: {error:#}" + ); + } + } + Ok(()) + } + } } - /// inserts all the given transactions into the mempool - /// - /// note: the oldest timestamp from found priorities for an `EnqueuedTransaction` is maintained. - #[instrument(skip_all)] - pub(crate) async fn insert_all(&self, txs: Vec<(EnqueuedTransaction, TransactionPriority)>) { - let mut queue = self.queue.write().await; - - for (enqueued_tx, priority) in txs { - Self::update_or_insert(&mut queue, enqueued_tx, &priority); - } + /// Returns a copy of all transactions and their hashes ready for execution, sorted first by the + /// difference between a transaction and the account's current nonce and then by the time that + /// the transaction was first seen by the appside mempool. + pub(crate) async fn builder_queue( + &self, + current_account_nonce_getter: F, + ) -> anyhow::Result)>> + where + F: Fn([u8; 20]) -> O, + O: Future>, + { + self.pending + .read() + .await + .builder_queue(current_account_nonce_getter) + .await } - /// inserts or updates the transaction in a timestamp preserving manner + /// Removes the target transaction and all transactions for associated account with higher + /// nonces. /// - /// note: updates the priority using the `possible_priority`'s nonce diff. - fn update_or_insert( - queue: &mut PriorityQueue, - enqueued_tx: EnqueuedTransaction, - possible_priority: &TransactionPriority, + /// This function should only be used to remove invalid/failing transactions and not executed + /// transactions. Executed transactions will be removed in the `run_maintenance()` function. + pub(crate) async fn remove_tx_invalid( + &self, + signed_tx: Arc, + reason: RemovalReason, ) { - let oldest_timestamp = queue.get_priority(&enqueued_tx).map_or( - possible_priority.time_first_seen, - |prev_priority| { - possible_priority - .time_first_seen - .min(prev_priority.time_first_seen) - }, - ); - - let priority = TransactionPriority { - nonce_diff: possible_priority.nonce_diff, - time_first_seen: oldest_timestamp, + let tx_hash = signed_tx.sha256_of_proto_encoding(); + let address = signed_tx.verification_key().address_bytes(); + + // Try to remove from pending. + let removed_txs = match self.pending.write().await.remove(signed_tx) { + Ok(mut removed_txs) => { + // Remove all of parked. + removed_txs.append(&mut self.parked.write().await.clear_account(&address)); + removed_txs + } + Err(signed_tx) => { + // Not found in pending, try to remove from parked and if not found, just return. + match self.parked.write().await.remove(signed_tx) { + Ok(removed_txs) => removed_txs, + Err(_) => return, + } + } }; - let tx_hash = enqueued_tx.tx_hash; - if queue.push(enqueued_tx, priority).is_none() { - // emit if didn't already exist - tracing::trace!( - tx_hash = %telemetry::display::hex(&tx_hash), - "inserted transaction into mempool" - ); + // Add all removed to removal cache for cometbft. + let mut removal_cache = self.comet_bft_removal_cache.write().await; + // Add the original tx first, since it will also be listed in `removed_txs`. The second + // attempt to add it inside the loop below will be a no-op. + removal_cache.add(tx_hash, reason); + for removed_tx in removed_txs { + removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } } - /// pops the transaction with the highest priority from the mempool - #[must_use] - #[instrument(skip_all)] - pub(crate) async fn pop(&self) -> Option<(EnqueuedTransaction, TransactionPriority)> { - self.queue.write().await.pop() - } - - /// removes a transaction from the mempool - #[instrument(skip_all)] - pub(crate) async fn remove(&self, tx_hash: [u8; 32]) { - let signed_tx = dummy_signed_tx(); - let enqueued_tx = EnqueuedTransaction { - tx_hash, - signed_tx, - }; - self.queue.write().await.remove(&enqueued_tx); - } - - /// signal that the transaction should be removed from the `CometBFT` mempool - #[instrument(skip_all)] - pub(crate) async fn track_removal_comet_bft(&self, tx_hash: [u8; 32], reason: RemovalReason) { - self.comet_bft_removal_cache - .write() - .await - .add(tx_hash, reason); - } - - /// checks if a transaction was flagged to be removed from the `CometBFT` mempool - /// and removes entry + /// Checks if a transaction was flagged to be removed from the `CometBFT` mempool. Will + /// remove the transaction from the cache if it is present. #[instrument(skip_all)] pub(crate) async fn check_removed_comet_bft(&self, tx_hash: [u8; 32]) -> Option { self.comet_bft_removal_cache.write().await.remove(tx_hash) } - /// Updates the priority of the txs in the mempool based on the current state, and removes any - /// that are now invalid. + /// Updates stored transactions to reflect current blockchain state. Will remove transactions + /// that have stale nonces and will remove transaction that are expired. /// - /// *NOTE*: this function locks the mempool until every tx has been checked. This could - /// potentially stall consensus from moving to the next round if the mempool is large. + /// All removed transactions are added to the CometBFT removal cache to aid with CometBFT + /// mempool maintenance. #[instrument(skip_all)] - pub(crate) async fn run_maintenance( - &self, - current_account_nonce_getter: F, - ) -> anyhow::Result<()> + pub(crate) async fn run_maintenance(&self, current_account_nonce_getter: F) where - F: Fn([u8; ADDRESS_LEN]) -> O, + F: Fn([u8; 20]) -> O, O: Future>, { - let mut txs_to_remove = Vec::new(); - let mut current_account_nonces = HashMap::new(); - - let mut queue = self.queue.write().await; - let mut removal_cache = self.comet_bft_removal_cache.write().await; - for (enqueued_tx, priority) in queue.iter_mut() { - let address_bytes = enqueued_tx.address_bytes(); - - // check if the transactions has expired - if priority.time_first_seen.elapsed() > self.tx_ttl { - // tx has expired, set to remove and add to removal cache - txs_to_remove.push(enqueued_tx.clone()); - removal_cache.add(enqueued_tx.tx_hash, RemovalReason::Expired); - continue; + let (mut pending, mut parked) = self.acquire_both_locks().await; + + // clean accounts of stale and expired transactions + let mut removed_txs = pending.clean_accounts(¤t_account_nonce_getter).await; + removed_txs.append(&mut parked.clean_accounts(¤t_account_nonce_getter).await); + + // run promotion logic in case transactions not in this mempool advanced account state + let to_promote = parked.find_promotables(¤t_account_nonce_getter).await; + // Release the lock asap. + drop(parked); + for (ttx, current_account_nonce) in to_promote { + if let Err(error) = pending.add(ttx, current_account_nonce) { + error!( + current_account_nonce, + "failed to promote transaction during maintenance: {error:#}" + ); } - - // Try to get the current account nonce from the ones already retrieved. - let current_account_nonce = if let Some(nonce) = - current_account_nonces.get(&address_bytes) - { - *nonce - } else { - // Fall back to getting via the getter and adding it to the local temp collection. - let nonce = current_account_nonce_getter(enqueued_tx.address_bytes()) - .await - .context("failed to fetch account nonce")?; - current_account_nonces.insert(address_bytes, nonce); - nonce - }; - match enqueued_tx.priority(current_account_nonce, Some(priority.time_first_seen)) { - Ok(new_priority) => *priority = new_priority, - Err(e) => { - debug!( - transaction_hash = %telemetry::display::base64(&enqueued_tx.tx_hash), - error = AsRef::::as_ref(&e), - "account nonce is now greater than tx nonce; dropping tx from mempool", - ); - txs_to_remove.push(enqueued_tx.clone()); - } - }; } - for enqueued_tx in txs_to_remove { - queue.remove(&enqueued_tx); + // add to removal cache for cometbft + let mut removal_cache = self.comet_bft_removal_cache.write().await; + for (tx_hash, reason) in removed_txs { + removal_cache.add(tx_hash, reason); } - - Ok(()) } - /// returns the pending nonce for the given address, - /// if it exists in the mempool. + /// Returns the highest pending nonce for the given address if it exists in the mempool. Note: + /// does not take into account gapped nonces in the parked queue. For example, if the + /// pending queue for an account has nonces [0,1] and the parked queue has [3], [1] will be + /// returned. #[instrument(skip_all)] - pub(crate) async fn pending_nonce(&self, address: [u8; ADDRESS_LEN]) -> Option { - let inner = self.queue.read().await; - let mut nonce = None; - for (tx, _priority) in inner.iter() { - if tx.address_bytes() == address { - nonce = Some(cmp::max(nonce.unwrap_or_default(), tx.signed_tx.nonce())); - } - } - nonce + pub(crate) async fn pending_nonce(&self, address: [u8; 20]) -> Option { + self.pending.read().await.pending_nonce(address) } -} -/// This exists to provide a `SignedTransaction` for the purposes of removing an entry from the -/// queue where we only have the tx hash available. -/// -/// The queue is indexed by `EnqueuedTransaction` which internally needs a `SignedTransaction`, but -/// this `signed_tx` field is ignored in the `PartialEq` and `Hash` impls of `EnqueuedTransaction` - -/// only the tx hash is considered. So we create an `EnqueuedTransaction` on the fly with the -/// correct tx hash and this dummy signed tx when removing from the queue. -fn dummy_signed_tx() -> Arc { - static TX: OnceLock> = OnceLock::new(); - let signed_tx = TX.get_or_init(|| { - let actions = vec![]; - let params = TransactionParams::builder() - .nonce(0) - .chain_id("dummy") - .build(); - let signing_key = SigningKey::from([0; 32]); - let unsigned_tx = UnsignedTransaction { - actions, - params, - }; - Arc::new(unsigned_tx.into_signed(&signing_key)) - }); - signed_tx.clone() + async fn acquire_both_locks( + &self, + ) -> ( + RwLockWriteGuard, + RwLockWriteGuard>, + ) { + let pending = self.pending.write().await; + let parked = self.parked.write().await; + (pending, parked) + } } #[cfg(test)] mod test { - use std::{ - hash::{ - Hash, - Hasher, - }, - time::Duration, - }; + use astria_core::crypto::SigningKey; use super::*; - use crate::app::test_utils::get_mock_tx; - - #[test] - fn transaction_priority_should_error_if_invalid() { - let enqueued_tx = EnqueuedTransaction::new(get_mock_tx(0)); - let priority = enqueued_tx.priority(1, None); - assert!( - priority - .unwrap_err() - .to_string() - .contains("less than current account nonce") - ); - } - - // From https://doc.rust-lang.org/std/cmp/trait.PartialOrd.html - #[test] - // allow: we want explicit assertions here to match the documented expected behavior. - #[allow(clippy::nonminimal_bool)] - fn transaction_priority_comparisons_should_be_consistent() { - let high = TransactionPriority { - nonce_diff: 0, - time_first_seen: Instant::now(), - }; - let low = TransactionPriority { - nonce_diff: 1, - time_first_seen: Instant::now(), - }; - - assert!(high.partial_cmp(&high) == Some(Ordering::Equal)); - assert!(high.partial_cmp(&low) == Some(Ordering::Greater)); - assert!(low.partial_cmp(&high) == Some(Ordering::Less)); - - // 1. a == b if and only if partial_cmp(a, b) == Some(Equal) - assert!(high == high); // Some(Equal) - assert!(!(high == low)); // Some(Greater) - assert!(!(low == high)); // Some(Less) - - // 2. a < b if and only if partial_cmp(a, b) == Some(Less) - assert!(low < high); // Some(Less) - assert!(!(high < high)); // Some(Equal) - assert!(!(high < low)); // Some(Greater) - - // 3. a > b if and only if partial_cmp(a, b) == Some(Greater) - assert!(high > low); // Some(Greater) - assert!(!(high > high)); // Some(Equal) - assert!(!(low > high)); // Some(Less) - - // 4. a <= b if and only if a < b || a == b - assert!(low <= high); // a < b - assert!(high <= high); // a == b - assert!(!(high <= low)); // a > b - - // 5. a >= b if and only if a > b || a == b - assert!(high >= low); // a > b - assert!(high >= high); // a == b - assert!(!(low >= high)); // a < b - - // 6. a != b if and only if !(a == b) - assert!(high != low); // asserted !(high == low) above - assert!(low != high); // asserted !(low == high) above - assert!(!(high != high)); // asserted high == high above - } - - #[test] - // From https://doc.rust-lang.org/std/hash/trait.Hash.html#hash-and-eq - fn enqueued_tx_hash_and_eq_should_be_consistent() { - // Check enqueued txs compare equal if and only if their tx hashes are equal. - let tx0 = EnqueuedTransaction { - tx_hash: [0; 32], - signed_tx: Arc::new(get_mock_tx(0)), - }; - let other_tx0 = EnqueuedTransaction { - tx_hash: [0; 32], - signed_tx: Arc::new(get_mock_tx(1)), - }; - let tx1 = EnqueuedTransaction { - tx_hash: [1; 32], - signed_tx: Arc::new(get_mock_tx(0)), - }; - assert!(tx0 == other_tx0); - assert!(tx0 != tx1); - - // Check enqueued txs' std hashes compare equal if and only if their tx hashes are equal. - let std_hash = |enqueued_tx: &EnqueuedTransaction| -> u64 { - let mut hasher = std::hash::DefaultHasher::new(); - enqueued_tx.hash(&mut hasher); - hasher.finish() - }; - assert!(std_hash(&tx0) == std_hash(&other_tx0)); - assert!(std_hash(&tx0) != std_hash(&tx1)); - } + use crate::app::test_utils::mock_tx; #[tokio::test] - async fn should_insert_and_pop() { + async fn insert() { let mempool = Mempool::new(); + let signing_key = SigningKey::from([1; 32]); - // Priority 0 (highest priority). - let tx0 = get_mock_tx(0); - mempool.insert(tx0.clone(), 0).await.unwrap(); - - // Priority 1. - let tx1 = get_mock_tx(1); - mempool.insert(tx1.clone(), 0).await.unwrap(); + // sign and insert nonce 1 + let tx1 = mock_tx(1, &signing_key, "test"); + assert!( + mempool.insert(tx1.clone(), 0).await.is_ok(), + "should be able to insert nonce 1 transaction into mempool" + ); - assert_eq!(mempool.len().await, 2); + // try to insert again + assert_eq!( + mempool.insert(tx1.clone(), 0).await.unwrap_err(), + InsertionError::AlreadyPresent, + "already present" + ); - // Should pop priority 0 first. - let (tx, priority) = mempool.pop().await.unwrap(); + // try to replace nonce + let tx1_replacement = mock_tx(1, &signing_key, "test_0"); assert_eq!( - tx.signed_tx.sha256_of_proto_encoding(), - tx0.sha256_of_proto_encoding() + mempool + .insert(tx1_replacement.clone(), 0) + .await + .unwrap_err(), + InsertionError::NonceTaken, + "nonce replace not allowed" ); - assert_eq!(priority.nonce_diff, 0); - assert_eq!(mempool.len().await, 1); - // Should pop priority 1 second. - let (tx, priority) = mempool.pop().await.unwrap(); + // add too low nonce + let tx0 = mock_tx(0, &signing_key, "test"); assert_eq!( - tx.signed_tx.sha256_of_proto_encoding(), - tx1.sha256_of_proto_encoding() + mempool.insert(tx0.clone(), 1).await.unwrap_err(), + InsertionError::NonceTooLow, + "nonce too low" ); - assert_eq!(priority.nonce_diff, 1); - assert_eq!(mempool.len().await, 0); } #[tokio::test] - async fn should_remove() { - let mempool = Mempool::new(); - let tx_count = 5_usize; - - let current_account_nonce = 0; - let txs: Vec<_> = (0..tx_count) - .map(|index| { - let enqueued_tx = - EnqueuedTransaction::new(get_mock_tx(u32::try_from(index).unwrap())); - let priority = enqueued_tx.priority(current_account_nonce, None).unwrap(); - (enqueued_tx, priority) - }) - .collect(); - mempool.insert_all(txs.clone()).await; - assert_eq!(mempool.len().await, tx_count); - - // Remove the last tx. - let last_tx_hash = txs.last().unwrap().0.tx_hash; - mempool.remove(last_tx_hash).await; - let mut expected_remaining_count = tx_count.checked_sub(1).unwrap(); - assert_eq!(mempool.len().await, expected_remaining_count); - - // Removing it again should have no effect. - mempool.remove(last_tx_hash).await; - assert_eq!(mempool.len().await, expected_remaining_count); - - // Remove the first tx. - mempool.remove(txs.first().unwrap().0.tx_hash).await; - expected_remaining_count = expected_remaining_count.checked_sub(1).unwrap(); - assert_eq!(mempool.len().await, expected_remaining_count); - - // Check the next tx popped is the second priority. - let (tx, priority) = mempool.pop().await.unwrap(); - assert_eq!(tx.tx_hash, txs[1].0.tx_hash()); - assert_eq!(priority.nonce_diff, 1); - } + async fn single_account_flow_extensive() { + // This test tries to hit the more complex edges of the mempool with a single account. + // The test adds the nonces [1,2,0,4], creates a builder queue with the account + // nonce at 1, and then cleans the pool to nonce 4. This tests some of the + // odder edge cases that can be hit if a node goes offline or fails to see + // some transactions that other nodes include into their proposed blocks. - #[tokio::test] - async fn should_update_priorities() { let mempool = Mempool::new(); + let signing_key = SigningKey::from([1; 32]); + let signing_address = signing_key.verification_key().address_bytes(); - // Insert txs signed by alice with nonces 0 and 1. - mempool.insert(get_mock_tx(0), 0).await.unwrap(); - mempool.insert(get_mock_tx(1), 0).await.unwrap(); - - // Insert txs from a different signer with nonces 100 and 102. - let other = SigningKey::from([1; 32]); - let other_mock_tx = |nonce: u32| -> SignedTransaction { - let actions = get_mock_tx(0).actions().to_vec(); - UnsignedTransaction { - params: TransactionParams::builder() - .nonce(nonce) - .chain_id("test") - .build(), - actions, - } - .into_signed(&other) - }; - mempool.insert(other_mock_tx(100), 0).await.unwrap(); - mempool.insert(other_mock_tx(102), 0).await.unwrap(); + // add nonces in odd order to trigger insertion promotion logic + // sign and insert nonce 1 + let tx1 = mock_tx(1, &signing_key, "test"); + assert!( + mempool.insert(tx1.clone(), 0).await.is_ok(), + "should be able to insert nonce 1 transaction into mempool" + ); - assert_eq!(mempool.len().await, 4); + // sign and insert nonce 2 + let tx2 = mock_tx(2, &signing_key, "test"); + assert!( + mempool.insert(tx2.clone(), 0).await.is_ok(), + "should be able to insert nonce 2 transaction into mempool" + ); - let alice = crate::app::test_utils::get_alice_signing_key(); + // sign and insert nonce 0 + let tx0 = mock_tx(0, &signing_key, "test"); + assert!( + mempool.insert(tx0.clone(), 0).await.is_ok(), + "should be able to insert nonce 0 transaction into mempool" + ); - // Create a getter fn which will returns 1 for alice's current account nonce, and 101 for - // the other signer's. - let current_account_nonce_getter = |address: [u8; ADDRESS_LEN]| { - let alice = alice.clone(); - let other = other.clone(); - async move { - if address == alice.address_bytes() { - return Ok(1); - } - if address == other.address_bytes() { - return Ok(101); - } - Err(anyhow::anyhow!("invalid address")) + // sign and insert nonce 4 + let tx4 = mock_tx(4, &signing_key, "test"); + assert!( + mempool.insert(tx4.clone(), 0).await.is_ok(), + "should be able to insert nonce 4 transaction into mempool" + ); + + // assert size + assert_eq!(mempool.len().await, 4); + + // mock nonce getter with nonce at 1 + let current_account_nonce_getter = |address: [u8; 20]| async move { + if address == signing_address { + return Ok(1); } + Err(anyhow::anyhow!("invalid address")) }; - // Update the priorities. Alice's first tx (with nonce 0) and other's first (with nonce - // 100) should both get purged. - mempool - .run_maintenance(current_account_nonce_getter) + // grab building queue, should return transactions [1,2] since [0] was below and [4] is + // gapped + let builder_queue = mempool + .builder_queue(current_account_nonce_getter) .await - .unwrap(); + .expect("failed to get builder queue"); + + // see contains first two transactions that should be pending + assert_eq!(builder_queue[0].1.nonce(), 1, "nonce should be one"); + assert_eq!(builder_queue[1].1.nonce(), 2, "nonce should be two"); + + // see mempool's transactions just cloned, not consumed + assert_eq!(mempool.len().await, 4); - assert_eq!(mempool.len().await, 2); + // run maintenance with simulated nonce to remove the nonces 0,1,2 and promote 4 from parked + // to pending + let current_account_nonce_getter = |address: [u8; 20]| async move { + if address == signing_address { + return Ok(4); + } + Err(anyhow::anyhow!("invalid address")) + }; + mempool.run_maintenance(current_account_nonce_getter).await; - // Alice's remaining tx should be the highest priority (nonce diff of 1 - 1 == 0). - let (tx, priority) = mempool.pop().await.unwrap(); - assert_eq!(tx.signed_tx.nonce(), 1); - assert_eq!(*tx.signed_tx.verification_key(), alice.verification_key()); - assert_eq!(priority.nonce_diff, 0); + // assert mempool at 1 + assert_eq!(mempool.len().await, 1); - // Other's remaining tx should be the highest priority (nonce diff of 102 - 101 == 1). - let (tx, priority) = mempool.pop().await.unwrap(); - assert_eq!(tx.signed_tx.nonce(), 102); - assert_eq!(*tx.signed_tx.verification_key(), other.verification_key()); - assert_eq!(priority.nonce_diff, 1); + // see transaction [4] properly promoted + let mut builder_queue = mempool + .builder_queue(current_account_nonce_getter) + .await + .expect("failed to get builder queue"); + let (_, returned_tx) = builder_queue.pop().expect("should return last transaction"); + assert_eq!(returned_tx.nonce(), 4, "nonce should be four"); } - #[tokio::test(start_paused = true)] - async fn transaction_timestamp_not_overwritten_insert() { + #[tokio::test] + async fn remove_invalid() { let mempool = Mempool::new(); + let signing_key = SigningKey::from([1; 32]); - let insert_time = Instant::now(); - let tx = get_mock_tx(0); - mempool.insert(tx.clone(), 0).await.unwrap(); - - // pass time - tokio::time::advance(Duration::from_secs(60)).await; - assert_eq!( - insert_time.elapsed(), - Duration::from_secs(60), - "time should have advanced" + // sign and insert nonces 0,1 and 3,4,5 + let tx0 = mock_tx(0, &signing_key, "test"); + assert!( + mempool.insert(tx0.clone(), 0).await.is_ok(), + "should be able to insert nonce 0 transaction into mempool" ); - - // re-insert - mempool.insert(tx, 0).await.unwrap(); - - // check that the timestamp was not overwritten in insert() - let (_, tx_priority) = mempool - .pop() - .await - .expect("transaction was added, should exist"); - assert_eq!( - tx_priority.time_first_seen.duration_since(insert_time), - Duration::from_secs(0), - "Tracked time should be the same" + let tx1 = mock_tx(1, &signing_key, "test"); + assert!( + mempool.insert(tx1.clone(), 0).await.is_ok(), + "should be able to insert nonce 1 transaction into mempool" ); - } + let tx3 = mock_tx(3, &signing_key, "test"); + assert!( + mempool.insert(tx3.clone(), 0).await.is_ok(), + "should be able to insert nonce 3 transaction into mempool" + ); + let tx4 = mock_tx(4, &signing_key, "test"); + assert!( + mempool.insert(tx4.clone(), 0).await.is_ok(), + "should be able to insert nonce 4 transaction into mempool" + ); + let tx5 = mock_tx(5, &signing_key, "test"); + assert!( + mempool.insert(tx5.clone(), 0).await.is_ok(), + "should be able to insert nonce 5 transaction into mempool" + ); + assert_eq!(mempool.len().await, 5); - #[tokio::test(start_paused = true)] - async fn transaction_timestamp_not_overwritten_insert_all() { - let mempool = Mempool::new(); + let removal_reason = RemovalReason::FailedPrepareProposal("reason".to_string()); - let insert_time = Instant::now(); - let tx = get_mock_tx(0); - mempool.insert(tx.clone(), 0).await.unwrap(); + // remove 4, should remove 4 and 5 + mempool + .remove_tx_invalid(tx4.clone(), removal_reason.clone()) + .await; + assert_eq!(mempool.len().await, 3); - // pass time - tokio::time::advance(Duration::from_secs(60)).await; - assert_eq!( - insert_time.elapsed(), - Duration::from_secs(60), - "time should have advanced" - ); + // remove 4 again is also ok + mempool + .remove_tx_invalid( + tx4.clone(), + RemovalReason::NonceStale, // shouldn't be inserted into removal cache + ) + .await; + assert_eq!(mempool.len().await, 3); + + // remove 1, should remove 1 and 3 + mempool + .remove_tx_invalid(tx1.clone(), removal_reason.clone()) + .await; + assert_eq!(mempool.len().await, 1); - // re-insert with new priority with higher timestamp - let enqueued_tx = EnqueuedTransaction::new(tx); - let new_priority = TransactionPriority { - nonce_diff: 0, - time_first_seen: Instant::now(), - }; - mempool.insert_all(vec![(enqueued_tx, new_priority)]).await; + // remove 0 + mempool + .remove_tx_invalid(tx0.clone(), removal_reason.clone()) + .await; + assert_eq!(mempool.len().await, 0); - // check that the timestamp was not overwritten in insert() - let (_, tx_priority) = mempool - .pop() - .await - .expect("transaction was added, should exist"); - assert_eq!( - tx_priority.time_first_seen.duration_since(insert_time), - Duration::from_secs(0), - "Tracked time should be the same" - ); + // assert that all were added to the cometbft removal cache + // and the expected reasons were tracked + assert!(matches!( + mempool + .check_removed_comet_bft(tx0.sha256_of_proto_encoding()) + .await, + Some(RemovalReason::FailedPrepareProposal(_)) + )); + assert!(matches!( + mempool + .check_removed_comet_bft(tx1.sha256_of_proto_encoding()) + .await, + Some(RemovalReason::FailedPrepareProposal(_)) + )); + assert!(matches!( + mempool + .check_removed_comet_bft(tx3.sha256_of_proto_encoding()) + .await, + Some(RemovalReason::LowerNonceInvalidated) + )); + assert!(matches!( + mempool + .check_removed_comet_bft(tx4.sha256_of_proto_encoding()) + .await, + Some(RemovalReason::FailedPrepareProposal(_)) + )); + assert!(matches!( + mempool + .check_removed_comet_bft(tx5.sha256_of_proto_encoding()) + .await, + Some(RemovalReason::LowerNonceInvalidated) + )); } #[tokio::test] async fn should_get_pending_nonce() { let mempool = Mempool::new(); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_key_1 = SigningKey::from([2; 32]); + let signing_key_2 = SigningKey::from([3; 32]); + let signing_address_0 = signing_key_0.verification_key().address_bytes(); + let signing_address_1 = signing_key_1.verification_key().address_bytes(); + let signing_address_2 = signing_key_2.verification_key().address_bytes(); + + // sign and insert nonces 0,1 + let tx0 = mock_tx(0, &signing_key_0, "test"); + assert!( + mempool.insert(tx0.clone(), 0).await.is_ok(), + "should be able to insert nonce 0 transaction into mempool" + ); + let tx1 = mock_tx(1, &signing_key_0, "test"); + assert!( + mempool.insert(tx1.clone(), 0).await.is_ok(), + "should be able to insert nonce 1 transaction into mempool" + ); - // Insert txs signed by alice with nonces 0 and 1. - mempool.insert(get_mock_tx(0), 0).await.unwrap(); - mempool.insert(get_mock_tx(1), 0).await.unwrap(); - - // Insert txs from a different signer with nonces 100 and 101. - let other = SigningKey::from([1; 32]); - let other_mock_tx = |nonce: u32| -> SignedTransaction { - let actions = get_mock_tx(0).actions().to_vec(); - UnsignedTransaction { - params: TransactionParams::builder() - .nonce(nonce) - .chain_id("test") - .build(), - actions, - } - .into_signed(&other) - }; - mempool.insert(other_mock_tx(100), 0).await.unwrap(); - mempool.insert(other_mock_tx(101), 0).await.unwrap(); + // sign and insert nonces 100, 101 + let tx100 = mock_tx(100, &signing_key_1, "test"); + assert!( + mempool.insert(tx100.clone(), 100).await.is_ok(), + "should be able to insert nonce 100 transaction into mempool" + ); + let tx101 = mock_tx(101, &signing_key_1, "test"); + assert!( + mempool.insert(tx101.clone(), 100).await.is_ok(), + "should be able to insert nonce 101 transaction into mempool" + ); assert_eq!(mempool.len().await, 4); - // Check the pending nonce for alice is 1 and for the other signer is 101. - let alice = crate::app::test_utils::get_alice_signing_key(); - assert_eq!( - mempool.pending_nonce(alice.address_bytes()).await.unwrap(), - 1 - ); - assert_eq!( - mempool.pending_nonce(other.address_bytes()).await.unwrap(), - 101 - ); + // Check the pending nonces + assert_eq!(mempool.pending_nonce(signing_address_0).await.unwrap(), 1); + assert_eq!(mempool.pending_nonce(signing_address_1).await.unwrap(), 101); - // Check the pending nonce for an address with no enqueued txs is `None`. - assert!(mempool.pending_nonce([1; 20]).await.is_none()); + // Check the pending nonce for an address with no txs is `None`. + assert!(mempool.pending_nonce(signing_address_2).await.is_none()); } #[tokio::test] - async fn tx_cache_size() { + async fn tx_removal_cache() { let mut tx_cache = RemovalCache::new(NonZeroUsize::try_from(2).unwrap()); let tx_0 = [0u8; 32]; @@ -814,10 +639,18 @@ mod test { ); } - #[test] - fn enqueued_transaction_can_be_instantiated() { - // This just tests that the constructor does not fail. - let signed_tx = crate::app::test_utils::get_mock_tx(0); - let _ = EnqueuedTransaction::new(signed_tx); + #[tokio::test] + async fn tx_removal_cache_preserves_first_reason() { + let mut tx_cache = RemovalCache::new(NonZeroUsize::try_from(2).unwrap()); + + let tx_0 = [0u8; 32]; + + tx_cache.add(tx_0, RemovalReason::Expired); + tx_cache.add(tx_0, RemovalReason::LowerNonceInvalidated); + + assert!( + matches!(tx_cache.remove(tx_0), Some(RemovalReason::Expired)), + "first removal reason should be presenved" + ); } } diff --git a/crates/astria-sequencer/src/mempool/transactions_container.rs b/crates/astria-sequencer/src/mempool/transactions_container.rs new file mode 100644 index 0000000000..73fccd94fe --- /dev/null +++ b/crates/astria-sequencer/src/mempool/transactions_container.rs @@ -0,0 +1,1584 @@ +use std::{ + cmp::Ordering, + collections::{ + hash_map, + BTreeMap, + HashMap, + }, + fmt, + future::Future, + mem, + sync::Arc, +}; + +use anyhow::Context; +use astria_core::protocol::transaction::v1alpha1::SignedTransaction; +use tokio::time::{ + Duration, + Instant, +}; +use tracing::error; + +use super::RemovalReason; + +pub(super) type PendingTransactions = TransactionsContainer; +pub(super) type ParkedTransactions = + TransactionsContainer>; + +/// `TimemarkedTransaction` is a wrapper around a signed transaction used to keep track of when that +/// transaction was first seen in the mempool. +#[derive(Clone, Debug)] +pub(super) struct TimemarkedTransaction { + signed_tx: Arc, + tx_hash: [u8; 32], + time_first_seen: Instant, + address: [u8; 20], +} + +impl TimemarkedTransaction { + pub(super) fn new(signed_tx: Arc) -> Self { + Self { + tx_hash: signed_tx.sha256_of_proto_encoding(), + address: signed_tx.verification_key().address_bytes(), + signed_tx, + time_first_seen: Instant::now(), + } + } + + fn priority(&self, current_account_nonce: u32) -> anyhow::Result { + let Some(nonce_diff) = self.signed_tx.nonce().checked_sub(current_account_nonce) else { + return Err(anyhow::anyhow!( + "transaction nonce {} is less than current account nonce {current_account_nonce}", + self.signed_tx.nonce() + )); + }; + + Ok(TransactionPriority { + nonce_diff, + time_first_seen: self.time_first_seen, + }) + } + + fn is_expired(&self, now: Instant, ttl: Duration) -> bool { + now.saturating_duration_since(self.time_first_seen) > ttl + } + + pub(super) fn nonce(&self) -> u32 { + self.signed_tx.nonce() + } + + pub(super) fn address(&self) -> &[u8; 20] { + &self.address + } +} + +impl fmt::Display for TimemarkedTransaction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "tx_hash: {}, address: {}, signer: {}, nonce: {}, chain ID: {}", + telemetry::display::base64(&self.tx_hash), + telemetry::display::base64(&self.address), + self.signed_tx.verification_key(), + self.signed_tx.nonce(), + self.signed_tx.chain_id(), + ) + } +} + +#[derive(Clone, Copy, Debug)] +struct TransactionPriority { + nonce_diff: u32, + time_first_seen: Instant, +} + +impl PartialEq for TransactionPriority { + fn eq(&self, other: &Self) -> bool { + self.nonce_diff == other.nonce_diff && self.time_first_seen == other.time_first_seen + } +} + +impl Eq for TransactionPriority {} + +impl Ord for TransactionPriority { + fn cmp(&self, other: &Self) -> Ordering { + // we want to first order by nonce difference + // lower nonce diff means higher priority + let nonce_diff = self.nonce_diff.cmp(&other.nonce_diff).reverse(); + + // then by timestamp if equal + if nonce_diff == Ordering::Equal { + // lower timestamp means higher priority + return self.time_first_seen.cmp(&other.time_first_seen).reverse(); + } + nonce_diff + } +} + +impl PartialOrd for TransactionPriority { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) enum InsertionError { + AlreadyPresent, + NonceTooLow, + NonceTaken, + NonceGap, + AccountSizeLimit, +} + +impl fmt::Display for InsertionError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + InsertionError::AlreadyPresent => { + write!(f, "transaction already exists in the mempool") + } + InsertionError::NonceTooLow => { + write!(f, "given nonce has already been used previously") + } + InsertionError::NonceTaken => write!(f, "given nonce already exists in the mempool"), + InsertionError::NonceGap => write!(f, "gap in the pending nonce sequence"), + InsertionError::AccountSizeLimit => write!( + f, + "maximum number of pending transactions has been reached for the given account" + ), + } + } +} + +/// Transactions for a single account where the sequence of nonces must not have any gaps. +#[derive(Clone, Default, Debug)] +pub(super) struct PendingTransactionsForAccount { + txs: BTreeMap, +} + +impl PendingTransactionsForAccount { + fn highest_nonce(&self) -> Option { + self.txs.last_key_value().map(|(nonce, _)| *nonce) + } +} + +impl TransactionsForAccount for PendingTransactionsForAccount { + fn txs(&self) -> &BTreeMap { + &self.txs + } + + fn txs_mut(&mut self) -> &mut BTreeMap { + &mut self.txs + } + + fn is_at_tx_limit(&self) -> bool { + false + } + + fn is_sequential_nonce_precondition_met( + &self, + ttx: &TimemarkedTransaction, + current_account_nonce: u32, + ) -> bool { + // If the `ttx` nonce is 0, precondition is met iff the current account nonce is also at + // zero + let Some(previous_nonce) = ttx.signed_tx.nonce().checked_sub(1) else { + return current_account_nonce == 0; + }; + + // Precondition is met if the previous nonce is in the existing txs, or if the tx's nonce + // is equal to the account nonce + self.txs().contains_key(&previous_nonce) || ttx.signed_tx.nonce() == current_account_nonce + } +} + +/// Transactions for a single account where gaps are allowed in the sequence of nonces, and with an +/// upper bound on the number of transactions. +#[derive(Clone, Default, Debug)] +pub(super) struct ParkedTransactionsForAccount { + txs: BTreeMap, +} + +impl ParkedTransactionsForAccount { + /// Returns contiguous transactions from front of queue starting from target nonce, removing the + /// transactions in the process. + /// + /// Note: this function only operates on the front of the queue. If the target nonce is not at + /// the front, an error will be logged and nothing will be returned. + fn pop_front_contiguous( + &mut self, + mut target_nonce: u32, + ) -> impl Iterator { + let mut split_at = 0; + for nonce in self.txs.keys() { + if *nonce == target_nonce { + let Some(next_target) = target_nonce.checked_add(1) else { + // We've got contiguous nonces up to `u32::MAX`; return everything. + return mem::take(&mut self.txs).into_values(); + }; + target_nonce = next_target; + split_at = next_target; + } else { + break; + } + } + + if split_at == 0 { + error!(target_nonce, "expected nonce to be present"); + } + + let mut split_off = self.txs.split_off(&split_at); + // The higher nonces are returned in `split_off`, but we want to keep these in `self.txs`, + // so swap the two collections. + mem::swap(&mut split_off, &mut self.txs); + split_off.into_values() + } +} + +impl TransactionsForAccount + for ParkedTransactionsForAccount +{ + fn txs(&self) -> &BTreeMap { + &self.txs + } + + fn txs_mut(&mut self) -> &mut BTreeMap { + &mut self.txs + } + + fn is_at_tx_limit(&self) -> bool { + self.txs.len() >= MAX_TX_COUNT + } + + fn is_sequential_nonce_precondition_met(&self, _: &TimemarkedTransaction, _: u32) -> bool { + true + } +} + +/// `TransactionsForAccount` is a trait for a collection of transactions belonging to a single +/// account. +pub(super) trait TransactionsForAccount: Default { + fn new() -> Self + where + Self: Sized + Default, + { + Self::default() + } + + fn txs(&self) -> &BTreeMap; + + fn txs_mut(&mut self) -> &mut BTreeMap; + + fn is_at_tx_limit(&self) -> bool; + + /// Returns `Ok` if adding `ttx` would not break the nonce precondition, i.e. sequential + /// nonces with no gaps if in `SequentialNonces` mode. + fn is_sequential_nonce_precondition_met( + &self, + ttx: &TimemarkedTransaction, + current_account_nonce: u32, + ) -> bool; + + /// Adds transaction to the container. Note: does NOT allow for nonce replacement. + /// Will fail if in `SequentialNonces` mode and adding the transaction would create a nonce gap. + /// + /// `current_account_nonce` should be the account's nonce in the latest chain state. + /// + /// Note: if the account `current_account_nonce` ever decreases, this is a logic error + /// and could mess up the validity of `SequentialNonces` containers. + fn add( + &mut self, + ttx: TimemarkedTransaction, + current_account_nonce: u32, + ) -> Result<(), InsertionError> { + if self.is_at_tx_limit() { + return Err(InsertionError::AccountSizeLimit); + } + + if ttx.nonce() < current_account_nonce { + return Err(InsertionError::NonceTooLow); + } + + if let Some(existing_ttx) = self.txs().get(&ttx.signed_tx.nonce()) { + return Err(if existing_ttx.tx_hash == ttx.tx_hash { + InsertionError::AlreadyPresent + } else { + InsertionError::NonceTaken + }); + } + + if !self.is_sequential_nonce_precondition_met(&ttx, current_account_nonce) { + return Err(InsertionError::NonceGap); + } + + self.txs_mut().insert(ttx.signed_tx.nonce(), ttx); + + Ok(()) + } + + /// Removes transactions with the given nonce and higher. + /// + /// Note: the given nonce is expected to be present. If it's absent, an error is logged and no + /// transactions are removed. + /// + /// Returns the hashes of the removed transactions. + fn remove(&mut self, nonce: u32) -> Vec<[u8; 32]> { + if !self.txs().contains_key(&nonce) { + error!(nonce, "transaction with given nonce not found"); + return Vec::new(); + } + + self.txs_mut() + .split_off(&nonce) + .values() + .map(|ttx| ttx.tx_hash) + .collect() + } + + /// Returns the transaction with the lowest nonce. + fn front(&self) -> Option<&TimemarkedTransaction> { + self.txs().first_key_value().map(|(_, ttx)| ttx) + } + + /// Removes transactions below the given nonce. Returns the hashes of the removed transactions. + fn register_latest_account_nonce( + &mut self, + current_account_nonce: u32, + ) -> impl Iterator { + let mut split_off = self.txs_mut().split_off(¤t_account_nonce); + mem::swap(&mut split_off, self.txs_mut()); + split_off.into_values().map(|ttx| ttx.tx_hash) + } + + #[cfg(test)] + fn contains_tx(&self, tx_hash: &[u8; 32]) -> bool { + self.txs().values().any(|ttx| ttx.tx_hash == *tx_hash) + } +} + +/// `TransactionsContainer` is a container used for managing transactions for multiple accounts. +#[derive(Clone, Debug)] +pub(super) struct TransactionsContainer { + /// A map of collections of transactions, indexed by the account address. + txs: HashMap<[u8; 20], T>, + tx_ttl: Duration, +} + +impl TransactionsContainer { + pub(super) fn new(tx_ttl: Duration) -> Self { + TransactionsContainer:: { + txs: HashMap::new(), + tx_ttl, + } + } + + /// Adds the transaction to the container. + /// + /// `current_account_nonce` should be the current nonce of the account associated with the + /// transaction. If this ever decreases, the `TransactionsContainer` containers could become + /// invalid. + pub(super) fn add( + &mut self, + ttx: TimemarkedTransaction, + current_account_nonce: u32, + ) -> Result<(), InsertionError> { + match self.txs.entry(*ttx.address()) { + hash_map::Entry::Occupied(entry) => { + entry.into_mut().add(ttx, current_account_nonce)?; + } + hash_map::Entry::Vacant(entry) => { + let mut txs = T::new(); + txs.add(ttx, current_account_nonce)?; + entry.insert(txs); + } + } + Ok(()) + } + + /// Removes the given transaction and any transactions with higher nonces for the relevant + /// account. + /// + /// If `signed_tx` existed, returns `Ok` with the hashes of the removed transactions. If + /// `signed_tx` was not in the collection, it is returned via `Err`. + pub(super) fn remove( + &mut self, + signed_tx: Arc, + ) -> Result, Arc> { + let address = signed_tx.verification_key().address_bytes(); + + // Take the collection for this account out of `self` temporarily. + let Some(mut account_txs) = self.txs.remove(&address) else { + return Err(signed_tx); + }; + + let removed = account_txs.remove(signed_tx.nonce()); + + // Re-add the collection to `self` if it's not empty. + if !account_txs.txs().is_empty() { + let _ = self.txs.insert(address, account_txs); + } + + if removed.is_empty() { + return Err(signed_tx); + } + + Ok(removed) + } + + /// Removes all of the transactions for the given account and returns the hashes of the removed + /// transactions. + pub(super) fn clear_account(&mut self, address: &[u8; 20]) -> Vec<[u8; 32]> { + self.txs + .remove(address) + .map(|account_txs| account_txs.txs().values().map(|ttx| ttx.tx_hash).collect()) + .unwrap_or_default() + } + + /// Cleans all of the accounts in the container. Removes any transactions with stale nonces and + /// evicts all transactions from accounts whose lowest transaction has expired. + /// + /// Returns all transactions that have been removed with the reason why they have been removed. + pub(super) async fn clean_accounts( + &mut self, + current_account_nonce_getter: &F, + ) -> Vec<([u8; 32], RemovalReason)> + where + F: Fn([u8; 20]) -> O, + O: Future>, + { + // currently just removes stale nonces and will clear accounts if the + // transactions are older than the TTL + let mut accounts_to_remove = Vec::new(); + let mut removed_txs = Vec::new(); + let now = Instant::now(); + for (address, account_txs) in &mut self.txs { + // check if first tx is older than the TTL, if so, remove all transactions + if let Some(first_tx) = account_txs.front() { + if first_tx.is_expired(now, self.tx_ttl) { + // first is stale, rest popped for invalidation + removed_txs.push((first_tx.tx_hash, RemovalReason::Expired)); + removed_txs.extend( + account_txs + .txs() + .values() + .skip(1) + .map(|ttx| (ttx.tx_hash, RemovalReason::LowerNonceInvalidated)), + ); + account_txs.txs_mut().clear(); + } else { + // clean to newest nonce + let current_account_nonce = match current_account_nonce_getter(*address).await { + Ok(nonce) => nonce, + Err(error) => { + error!( + address = %telemetry::display::base64(address), + "failed to fetch nonce from state when cleaning accounts: {error:#}" + ); + continue; + } + }; + removed_txs.extend( + account_txs + .register_latest_account_nonce(current_account_nonce) + .map(|tx_hash| (tx_hash, RemovalReason::NonceStale)), + ); + } + } + + if account_txs.txs().is_empty() { + accounts_to_remove.push(*address); + } + } + + // remove empty accounts + for account in accounts_to_remove { + self.txs.remove(&account); + } + + removed_txs + } + + /// Returns the number of transactions in the container. + pub(super) fn len(&self) -> usize { + self.txs + .values() + .map(|account_txs| account_txs.txs().len()) + .sum() + } + + #[cfg(test)] + fn contains_tx(&self, tx_hash: &[u8; 32]) -> bool { + self.txs + .values() + .any(|account_txs| account_txs.contains_tx(tx_hash)) + } +} + +impl TransactionsContainer { + /// Returns the highest nonce for an account. + pub(super) fn pending_nonce(&self, address: [u8; 20]) -> Option { + self.txs + .get(&address) + .and_then(PendingTransactionsForAccount::highest_nonce) + } + + /// Returns a copy of transactions and their hashes sorted by nonce difference and then time + /// first seen. + pub(super) async fn builder_queue( + &self, + current_account_nonce_getter: F, + ) -> anyhow::Result)>> + where + F: Fn([u8; 20]) -> O, + O: Future>, + { + // Used to hold the values in Vec for sorting. + struct QueueEntry { + tx: Arc, + tx_hash: [u8; 32], + priority: TransactionPriority, + } + + let mut queue = Vec::with_capacity(self.len()); + // Add all transactions to the queue. + for (address, account_txs) in &self.txs { + let current_account_nonce = current_account_nonce_getter(*address) + .await + .context("failed to fetch account nonce for builder queue")?; + for ttx in account_txs.txs.values() { + let priority = match ttx.priority(current_account_nonce) { + Ok(priority) => priority, + Err(error) => { + // mempool could be off due to node connectivity issues + error!( + tx_hash = %telemetry::display::base64(&ttx.tx_hash), + "failed to add pending tx to builder queue: {error:#}" + ); + continue; + } + }; + queue.push(QueueEntry { + tx: ttx.signed_tx.clone(), + tx_hash: ttx.tx_hash, + priority, + }); + } + } + + // Sort the queue and return the relevant data. Note that the sorted queue will be ordered + // from lowest to highest priority, so we need to reverse the order before returning. + queue.sort_unstable_by_key(|entry| entry.priority); + Ok(queue + .into_iter() + .rev() + .map(|entry| (entry.tx_hash, entry.tx)) + .collect()) + } +} + +impl TransactionsContainer> { + /// Removes and returns the transactions from the front of an account, similar to + /// `find_promotables`. Useful for when needing to promote transactions from a specific + /// account instead of all accounts. + pub(super) fn pop_front_account( + &mut self, + account: &[u8; 20], + target_nonce: u32, + ) -> Vec { + // Take the collection for this account out of `self` temporarily. + let Some(mut account_txs) = self.txs.remove(account) else { + return Vec::new(); + }; + + let removed = account_txs.pop_front_contiguous(target_nonce); + + // Re-add the collection to `self` if it's not empty. + if !account_txs.txs().is_empty() { + let _ = self.txs.insert(*account, account_txs); + } + removed.collect() + } + + /// Removes and returns transactions along with their account's current nonce that are lower + /// than or equal to that nonce. This is helpful when needing to promote transactions from + /// parked to pending during mempool maintenance. + pub(super) async fn find_promotables( + &mut self, + current_account_nonce_getter: &F, + ) -> Vec<(TimemarkedTransaction, u32)> + where + F: Fn([u8; 20]) -> O, + O: Future>, + { + let mut accounts_to_remove = Vec::new(); + let mut promoted_txs = Vec::new(); + + for (address, account_txs) in &mut self.txs { + let current_account_nonce = match current_account_nonce_getter(*address).await { + Ok(nonce) => nonce, + Err(error) => { + error!( + address = %telemetry::display::base64(address), + "failed to fetch nonce from state when finding promotables: {error:#}" + ); + continue; + } + }; + + // find transactions that can be promoted + // note: can use current account nonce as target because this logic + // is only handling the case where transactions we didn't have in our + // local mempool were ran that would enable the parked transactions to + // be valid + promoted_txs.extend( + account_txs + .pop_front_contiguous(current_account_nonce) + .map(|ttx| (ttx, current_account_nonce)), + ); + + if account_txs.txs.is_empty() { + accounts_to_remove.push(*address); + } + } + + // remove empty accounts + for account in accounts_to_remove { + self.txs.remove(&account); + } + + promoted_txs + } +} + +#[cfg(test)] +mod test { + use astria_core::crypto::SigningKey; + + use super::*; + use crate::app::test_utils::mock_tx; + + const MAX_PARKED_TXS_PER_ACCOUNT: usize = 15; + const TX_TTL: Duration = Duration::from_secs(2); + + fn mock_ttx(nonce: u32, signer: &SigningKey) -> TimemarkedTransaction { + TimemarkedTransaction::new(mock_tx(nonce, signer, "test")) + } + + #[test] + fn transaction_priority_should_error_if_invalid() { + let ttx = TimemarkedTransaction::new(mock_tx(0, &[1; 32].into(), "test")); + let priority = ttx.priority(1); + + assert!( + priority + .unwrap_err() + .to_string() + .contains("less than current account nonce") + ); + } + + // From https://doc.rust-lang.org/std/cmp/trait.PartialOrd.html + #[test] + // allow: we want explicit assertions here to match the documented expected behavior. + #[allow(clippy::nonminimal_bool)] + fn transaction_priority_comparisons_should_be_consistent_nonce_diff() { + let instant = Instant::now(); + + let high = TransactionPriority { + nonce_diff: 0, + time_first_seen: instant, + }; + let low = TransactionPriority { + nonce_diff: 1, + time_first_seen: instant, + }; + + assert!(high.partial_cmp(&high) == Some(Ordering::Equal)); + assert!(high.partial_cmp(&low) == Some(Ordering::Greater)); + assert!(low.partial_cmp(&high) == Some(Ordering::Less)); + + // 1. a == b if and only if partial_cmp(a, b) == Some(Equal) + assert!(high == high); // Some(Equal) + assert!(!(high == low)); // Some(Greater) + assert!(!(low == high)); // Some(Less) + + // 2. a < b if and only if partial_cmp(a, b) == Some(Less) + assert!(low < high); // Some(Less) + assert!(!(high < high)); // Some(Equal) + assert!(!(high < low)); // Some(Greater) + + // 3. a > b if and only if partial_cmp(a, b) == Some(Greater) + assert!(high > low); // Some(Greater) + assert!(!(high > high)); // Some(Equal) + assert!(!(low > high)); // Some(Less) + + // 4. a <= b if and only if a < b || a == b + assert!(low <= high); // a < b + assert!(high <= high); // a == b + assert!(!(high <= low)); // a > b + + // 5. a >= b if and only if a > b || a == b + assert!(high >= low); // a > b + assert!(high >= high); // a == b + assert!(!(low >= high)); // a < b + + // 6. a != b if and only if !(a == b) + assert!(high != low); // asserted !(high == low) above + assert!(low != high); // asserted !(low == high) above + assert!(!(high != high)); // asserted high == high above + } + + // From https://doc.rust-lang.org/std/cmp/trait.PartialOrd.html + #[test] + // allow: we want explicit assertions here to match the documented expected behavior. + #[allow(clippy::nonminimal_bool)] + fn transaction_priority_comparisons_should_be_consistent_time_gap() { + let high = TransactionPriority { + nonce_diff: 0, + time_first_seen: Instant::now(), + }; + let low = TransactionPriority { + nonce_diff: 0, + time_first_seen: Instant::now() + Duration::from_micros(10), + }; + + assert!(high.partial_cmp(&high) == Some(Ordering::Equal)); + assert!(high.partial_cmp(&low) == Some(Ordering::Greater)); + assert!(low.partial_cmp(&high) == Some(Ordering::Less)); + + // 1. a == b if and only if partial_cmp(a, b) == Some(Equal) + assert!(high == high); // Some(Equal) + assert!(!(high == low)); // Some(Greater) + assert!(!(low == high)); // Some(Less) + + // 2. a < b if and only if partial_cmp(a, b) == Some(Less) + assert!(low < high); // Some(Less) + assert!(!(high < high)); // Some(Equal) + assert!(!(high < low)); // Some(Greater) + + // 3. a > b if and only if partial_cmp(a, b) == Some(Greater) + assert!(high > low); // Some(Greater) + assert!(!(high > high)); // Some(Equal) + assert!(!(low > high)); // Some(Less) + + // 4. a <= b if and only if a < b || a == b + assert!(low <= high); // a < b + assert!(high <= high); // a == b + assert!(!(high <= low)); // a > b + + // 5. a >= b if and only if a > b || a == b + assert!(high >= low); // a > b + assert!(high >= high); // a == b + assert!(!(low >= high)); // a < b + + // 6. a != b if and only if !(a == b) + assert!(high != low); // asserted !(high == low) above + assert!(low != high); // asserted !(low == high) above + assert!(!(high != high)); // asserted high == high above + } + + #[test] + fn parked_transactions_for_account_add() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); + + // transactions to add + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_5 = mock_ttx(5, &[1; 32].into()); + + let current_account_nonce = 2; + parked_txs + .add(ttx_3.clone(), current_account_nonce) + .unwrap(); + assert!(parked_txs.contains_tx(&ttx_3.tx_hash)); + assert_eq!( + parked_txs.add(ttx_3, current_account_nonce).unwrap_err(), + InsertionError::AlreadyPresent + ); + + // add gapped transaction + parked_txs.add(ttx_5, current_account_nonce).unwrap(); + + // fail adding too low nonce + assert_eq!( + parked_txs.add(ttx_1, current_account_nonce).unwrap_err(), + InsertionError::NonceTooLow + ); + } + + #[test] + fn parked_transactions_for_account_size_limit() { + let mut parked_txs = ParkedTransactionsForAccount::<2>::new(); + + // transactions to add + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_5 = mock_ttx(5, &[1; 32].into()); + + let current_account_nonce = 0; + parked_txs + .add(ttx_3.clone(), current_account_nonce) + .unwrap(); + parked_txs.add(ttx_5, current_account_nonce).unwrap(); + + // fail with size limit hit + assert_eq!( + parked_txs.add(ttx_1, current_account_nonce).unwrap_err(), + InsertionError::AccountSizeLimit + ); + } + + #[test] + fn pending_transactions_for_account_add() { + let mut pending_txs = PendingTransactionsForAccount::new(); + + // transactions to add + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + + let current_account_nonce = 1; + + // too low nonces not added + assert_eq!( + pending_txs.add(ttx_0, current_account_nonce).unwrap_err(), + InsertionError::NonceTooLow + ); + assert!(pending_txs.txs().is_empty()); + + // too high nonces with empty container not added + assert_eq!( + pending_txs + .add(ttx_2.clone(), current_account_nonce) + .unwrap_err(), + InsertionError::NonceGap + ); + assert!(pending_txs.txs().is_empty()); + + // add ok + pending_txs + .add(ttx_1.clone(), current_account_nonce) + .unwrap(); + assert_eq!( + pending_txs.add(ttx_1, current_account_nonce).unwrap_err(), + InsertionError::AlreadyPresent + ); + + // gapped transaction not allowed + assert_eq!( + pending_txs.add(ttx_3, current_account_nonce).unwrap_err(), + InsertionError::NonceGap + ); + + // can add consecutive + pending_txs.add(ttx_2, current_account_nonce).unwrap(); + } + + #[test] + fn transactions_for_account_remove() { + let mut account_txs = PendingTransactionsForAccount::new(); + + // transactions to add + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + + account_txs.add(ttx_0.clone(), 0).unwrap(); + account_txs.add(ttx_1.clone(), 0).unwrap(); + account_txs.add(ttx_2.clone(), 0).unwrap(); + account_txs.add(ttx_3.clone(), 0).unwrap(); + + // remove from end will only remove end + assert_eq!( + account_txs.remove(3), + vec![ttx_3.tx_hash], + "only one transaction should've been removed" + ); + assert_eq!(account_txs.txs().len(), 3); + + // remove same again return nothing + assert_eq!( + account_txs.remove(3).len(), + 0, + "no transaction should be removed" + ); + assert_eq!(account_txs.txs().len(), 3); + + // remove from start will remove all + assert_eq!( + account_txs.remove(0), + vec![ttx_0.tx_hash, ttx_1.tx_hash, ttx_2.tx_hash,], + "three transactions should've been removed" + ); + assert!(account_txs.txs().is_empty()); + } + + #[test] + fn parked_transactions_for_account_pop_front_contiguous() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); + + // transactions to add + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_4 = mock_ttx(4, &[1; 32].into()); + + parked_txs.add(ttx_0.clone(), 0).unwrap(); + parked_txs.add(ttx_2.clone(), 0).unwrap(); + parked_txs.add(ttx_3.clone(), 0).unwrap(); + parked_txs.add(ttx_4.clone(), 0).unwrap(); + + // lowest nonce not target nonce is noop + assert_eq!( + parked_txs.pop_front_contiguous(2).count(), + 0, + "no transaction should've been removed" + ); + assert_eq!(parked_txs.txs().len(), 4); + + // will remove single value + assert_eq!( + parked_txs + .pop_front_contiguous(0) + .map(|ttx| ttx.tx_hash) + .collect::>(), + vec![ttx_0.tx_hash], + "single transaction should've been returned" + ); + assert_eq!(parked_txs.txs().len(), 3); + + // will remove multiple values + assert_eq!( + parked_txs + .pop_front_contiguous(2) + .map(|ttx| ttx.tx_hash) + .collect::>(), + vec![ttx_2.tx_hash, ttx_3.tx_hash, ttx_4.tx_hash], + "multiple transaction should've been returned" + ); + assert!(parked_txs.txs().is_empty()); + } + + #[test] + fn pending_transactions_for_account_highest_nonce() { + let mut pending_txs = PendingTransactionsForAccount::new(); + + // no transactions ok + assert!( + pending_txs.highest_nonce().is_none(), + "no transactions will return None" + ); + + // transactions to add + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_1 = mock_ttx(1, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + + pending_txs.add(ttx_0, 0).unwrap(); + pending_txs.add(ttx_1, 0).unwrap(); + pending_txs.add(ttx_2, 0).unwrap(); + + // will return last transaction + assert_eq!( + pending_txs.highest_nonce(), + Some(2), + "highest nonce should be returned" + ); + } + + #[test] + fn transactions_for_account_front() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); + + // no transactions ok + assert!( + parked_txs.front().is_none(), + "no transactions will return None" + ); + + // transactions to add + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + + parked_txs.add(ttx_0.clone(), 0).unwrap(); + parked_txs.add(ttx_2, 0).unwrap(); + + // will return first transaction + assert_eq!( + parked_txs.front().unwrap().tx_hash, + ttx_0.tx_hash, + "lowest transaction should be returned" + ); + } + + #[test] + fn transactions_for_account_register_latest_account_nonce() { + let mut parked_txs = ParkedTransactionsForAccount::::new(); + + // transactions to add + let ttx_0 = mock_ttx(0, &[1; 32].into()); + let ttx_2 = mock_ttx(2, &[1; 32].into()); + let ttx_3 = mock_ttx(3, &[1; 32].into()); + let ttx_4 = mock_ttx(4, &[1; 32].into()); + + parked_txs.add(ttx_0.clone(), 0).unwrap(); + parked_txs.add(ttx_2.clone(), 0).unwrap(); + parked_txs.add(ttx_3.clone(), 0).unwrap(); + parked_txs.add(ttx_4.clone(), 0).unwrap(); + + // matching nonce will not be removed + assert_eq!( + parked_txs.register_latest_account_nonce(0).count(), + 0, + "no transaction should've been removed" + ); + assert_eq!(parked_txs.txs().len(), 4); + + // fast forwarding to non existing middle nonce ok + assert_eq!( + parked_txs + .register_latest_account_nonce(1) + .collect::>(), + vec![ttx_0.tx_hash], + "ttx_0 should've been removed" + ); + assert_eq!(parked_txs.txs().len(), 3); + + // fast forwarding to existing nonce ok + assert_eq!( + parked_txs + .register_latest_account_nonce(3) + .collect::>(), + vec![ttx_2.tx_hash], + "one transaction should've been removed" + ); + assert_eq!(parked_txs.txs().len(), 2); + + // fast forwarding to much higher nonce ok + assert_eq!( + parked_txs + .register_latest_account_nonce(10) + .collect::>(), + vec![ttx_3.tx_hash, ttx_4.tx_hash], + "two transactions should've been removed" + ); + assert!(parked_txs.txs().is_empty()); + } + + #[test] + fn transactions_container_add() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add to accounts + let ttx_s0_0_0 = mock_ttx(0, &signing_key_0); + // Same nonce and signer as `ttx_s0_0_0`, but different rollup name, hence different tx. + let ttx_s0_0_1 = TimemarkedTransaction::new(mock_tx(0, &signing_key_0, "other")); + let ttx_s0_2_0 = mock_ttx(2, &signing_key_0); + let ttx_s1_0_0 = mock_ttx(0, &signing_key_1); + + // transactions to add for account 1 + + // initially no accounts should exist + assert!( + pending_txs.txs.is_empty(), + "no accounts should exist at first" + ); + + // adding too low nonce shouldn't create account + assert_eq!( + pending_txs.add(ttx_s0_0_0.clone(), 1).unwrap_err(), + InsertionError::NonceTooLow, + "shouldn't be able to add nonce too low transaction" + ); + assert!( + pending_txs.txs.is_empty(), + "failed adds to new accounts shouldn't create account" + ); + + // add one transaction + pending_txs.add(ttx_s0_0_0.clone(), 0).unwrap(); + assert_eq!(pending_txs.txs.len(), 1, "one account should exist"); + + // re-adding transaction should fail + assert_eq!( + pending_txs.add(ttx_s0_0_0, 0).unwrap_err(), + InsertionError::AlreadyPresent, + "re-adding same transaction should fail" + ); + + // nonce replacement fails + assert_eq!( + pending_txs.add(ttx_s0_0_1, 0).unwrap_err(), + InsertionError::NonceTaken, + "nonce replacement not supported" + ); + + // nonce gaps not supported + assert_eq!( + pending_txs.add(ttx_s0_2_0, 0).unwrap_err(), + InsertionError::NonceGap, + "gapped nonces in pending transactions not allowed" + ); + + // add transactions for account 2 + pending_txs.add(ttx_s1_0_0, 0).unwrap(); + + // check internal structures + assert_eq!(pending_txs.txs.len(), 2, "two accounts should exist"); + assert_eq!( + pending_txs.txs.get(&signing_address_0).unwrap().txs().len(), + 1, + "one transaction should be in the original account" + ); + assert_eq!( + pending_txs.txs.get(&signing_address_1).unwrap().txs().len(), + 1, + "one transaction should be in the second account" + ); + assert_eq!( + pending_txs.len(), + 2, + "should only have two transactions tracked" + ); + } + + #[test] + fn transactions_container_remove() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_key_1 = SigningKey::from([2; 32]); + + // transactions to add to accounts + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + + // remove on empty returns the tx in Err variant. + assert!( + pending_txs.remove(ttx_s0_0.signed_tx.clone()).is_err(), + "zero transactions should be removed from non existing accounts" + ); + + // add transactions + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_1.clone(), 0).unwrap(); + + // remove should remove tx and higher + assert_eq!( + pending_txs.remove(ttx_s0_0.signed_tx.clone()).unwrap(), + vec![ttx_s0_0.tx_hash, ttx_s0_1.tx_hash], + "rest of transactions for account should be removed when targeting bottom nonce" + ); + assert_eq!(pending_txs.txs.len(), 1, "empty account should be removed"); + assert_eq!( + pending_txs.len(), + 2, + "should only have two transactions tracked" + ); + assert!( + pending_txs.contains_tx(&ttx_s1_0.tx_hash), + "other account should be untouched" + ); + assert!( + pending_txs.contains_tx(&ttx_s1_1.tx_hash), + "other account should be untouched" + ); + } + + #[test] + fn transactions_container_clear_account() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + + let signing_key_1 = SigningKey::from([2; 32]); + + // transactions to add to accounts + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); + + // clear all on empty returns zero + assert!( + pending_txs.clear_account(&signing_address_0).is_empty(), + "zero transactions should be removed from clearing non existing accounts" + ); + + // add transactions + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); + + // clear should return all transactions + assert_eq!( + pending_txs.clear_account(&signing_address_0), + vec![ttx_s0_0.tx_hash, ttx_s0_1.tx_hash], + "all transactions should be returned from clearing account" + ); + + assert_eq!(pending_txs.txs.len(), 1, "empty account should be removed"); + assert_eq!( + pending_txs.len(), + 1, + "should only have one transaction tracked" + ); + assert!( + pending_txs.contains_tx(&ttx_s1_0.tx_hash), + "other account should be untouched" + ); + } + + #[tokio::test] + async fn transactions_container_clean_accounts() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + let signing_key_2 = SigningKey::from([3; 32]); + let signing_address_2 = signing_key_2.address_bytes(); + + // transactions to add to accounts + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s0_2 = mock_ttx(2, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s2_0 = mock_ttx(0, &signing_key_2); + let ttx_s2_1 = mock_ttx(1, &signing_key_2); + let ttx_s2_2 = mock_ttx(2, &signing_key_2); + + // add transactions + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_2.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_2.clone(), 0).unwrap(); + pending_txs.add(ttx_s2_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s2_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s2_2.clone(), 0).unwrap(); + + // current nonce getter + // should pop none from signing_address_0, one from signing_address_1, and all from + // signing_address_2 + let current_account_nonce_getter = |address: [u8; 20]| async move { + if address == signing_address_0 { + Ok(0) + } else if address == signing_address_1 { + Ok(1) + } else if address == signing_address_2 { + Ok(4) + } else { + Err(anyhow::anyhow!("invalid address")) + } + }; + + let removed_txs = pending_txs + .clean_accounts(¤t_account_nonce_getter) + .await; + + assert_eq!( + removed_txs.len(), + 4, + "four transactions should've been popped" + ); + assert_eq!(pending_txs.txs.len(), 2, "empty accounts should be removed"); + assert_eq!( + pending_txs.len(), + 5, + "5 transactions should be remaining from original 9" + ); + assert!(pending_txs.contains_tx(&ttx_s0_0.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s0_1.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s0_2.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s1_1.tx_hash)); + assert!(pending_txs.contains_tx(&ttx_s1_2.tx_hash)); + + assert_eq!( + pending_txs.txs.get(&signing_address_0).unwrap().txs().len(), + 3 + ); + assert_eq!( + pending_txs.txs.get(&signing_address_1).unwrap().txs().len(), + 2 + ); + for (_, reason) in removed_txs { + assert!( + matches!(reason, RemovalReason::NonceStale), + "removal reason should be stale nonce" + ); + } + } + + #[tokio::test(start_paused = true)] + async fn transactions_container_clean_accounts_expired_transactions() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add to accounts + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + + // pass time to make first transaction stale + tokio::time::advance(TX_TTL.saturating_add(Duration::from_nanos(1))).await; + + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_0 = mock_ttx(0, &signing_key_1); + + // add transactions + pending_txs.add(ttx_s0_0.clone(), 0).unwrap(); + pending_txs.add(ttx_s0_1.clone(), 0).unwrap(); + pending_txs.add(ttx_s1_0.clone(), 0).unwrap(); + + // current nonce getter + // all nonces should be valid + let current_account_nonce_getter = |address: [u8; 20]| async move { + if address == signing_address_0 || address == signing_address_1 { + return Ok(0); + } + Err(anyhow::anyhow!("invalid address")) + }; + + let removed_txs = pending_txs + .clean_accounts(¤t_account_nonce_getter) + .await; + + assert_eq!( + removed_txs.len(), + 2, + "two transactions should've been popped" + ); + assert_eq!(pending_txs.txs.len(), 1, "empty accounts should be removed"); + assert_eq!( + pending_txs.len(), + 1, + "1 transaction should be remaining from original 3" + ); + assert!( + pending_txs.contains_tx(&ttx_s1_0.tx_hash), + "not expired account should be untouched" + ); + + // check removal reasons + assert_eq!( + removed_txs[0], + (ttx_s0_0.tx_hash, RemovalReason::Expired), + "first should be first pushed tx with removal reason as expired" + ); + assert_eq!( + removed_txs[1], + (ttx_s0_1.tx_hash, RemovalReason::LowerNonceInvalidated), + "second should be second added tx with removal reason as lower nonce invalidation" + ); + } + + #[test] + fn pending_transactions_pending_nonce() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add for account 0 + let ttx_s0_0 = mock_ttx(0, &signing_key_0); + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + + pending_txs.add(ttx_s0_0, 0).unwrap(); + pending_txs.add(ttx_s0_1, 0).unwrap(); + + // empty account returns zero + assert!( + pending_txs.pending_nonce(signing_address_1).is_none(), + "empty account should return None" + ); + + // non empty account returns highest nonce + assert_eq!( + pending_txs.pending_nonce(signing_address_0), + Some(1), + "should return highest nonce" + ); + } + + #[tokio::test] + async fn pending_transactions_builder_queue() { + let mut pending_txs = PendingTransactions::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add to accounts + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s1_3 = mock_ttx(3, &signing_key_1); + + // add transactions + pending_txs.add(ttx_s0_1.clone(), 1).unwrap(); + pending_txs.add(ttx_s1_1.clone(), 1).unwrap(); + pending_txs.add(ttx_s1_2.clone(), 1).unwrap(); + pending_txs.add(ttx_s1_3.clone(), 1).unwrap(); + + // current nonce getter + // should return all transactions from signing_key_0 and last two from signing_key_1 + let current_account_nonce_getter = |address: [u8; 20]| async move { + if address == signing_address_0 { + Ok(1) + } else if address == signing_address_1 { + Ok(2) + } else { + Err(anyhow::anyhow!("invalid address")) + } + }; + + // get builder queue + let builder_queue = pending_txs + .builder_queue(¤t_account_nonce_getter) + .await + .expect("building builders queue should work"); + assert_eq!( + builder_queue.len(), + 3, + "three transactions should've been popped" + ); + + // check that the transactions are in the expected order + let (first_tx_hash, _) = builder_queue[0]; + assert_eq!( + first_tx_hash, ttx_s0_1.tx_hash, + "expected earliest transaction with lowest nonce difference (0) to be first" + ); + let (second_tx_hash, _) = builder_queue[1]; + assert_eq!( + second_tx_hash, ttx_s1_2.tx_hash, + "expected other low nonce diff (0) to be second" + ); + let (third_tx_hash, _) = builder_queue[2]; + assert_eq!( + third_tx_hash, ttx_s1_3.tx_hash, + "expected highest nonce diff to be last" + ); + + // ensure transactions not removed + assert_eq!( + pending_txs.len(), + 4, + "no transactions should've been removed" + ); + } + + #[tokio::test] + async fn parked_transactions_pop_front_account() { + let mut parked_txs = ParkedTransactions::::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add to accounts + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s1_4 = mock_ttx(4, &signing_key_1); + + // add transactions + parked_txs.add(ttx_s0_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_2.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_4.clone(), 0).unwrap(); + + // pop from account 1 + assert_eq!( + parked_txs.pop_front_account(&signing_address_0, 1).len(), + 1, + "one transactions should've been popped" + ); + assert_eq!(parked_txs.txs.len(), 1, "empty accounts should be removed"); + + // pop from account 2 + assert_eq!( + parked_txs.pop_front_account(&signing_address_1, 1).len(), + 2, + "two transactions should've been popped" + ); + assert_eq!( + parked_txs.txs.len(), + 1, + "non empty accounts should not be removed" + ); + + assert_eq!( + parked_txs.len(), + 1, + "1 transactions should be remaining from original 4" + ); + assert!(parked_txs.contains_tx(&ttx_s1_4.tx_hash)); + } + + #[tokio::test] + async fn parked_transactions_find_promotables() { + let mut parked_txs = ParkedTransactions::::new(TX_TTL); + let signing_key_0 = SigningKey::from([1; 32]); + let signing_address_0 = signing_key_0.address_bytes(); + let signing_key_1 = SigningKey::from([2; 32]); + let signing_address_1 = signing_key_1.address_bytes(); + + // transactions to add to accounts + let ttx_s0_1 = mock_ttx(1, &signing_key_0); + let ttx_s0_2 = mock_ttx(2, &signing_key_0); + let ttx_s0_3 = mock_ttx(3, &signing_key_0); + let ttx_s1_1 = mock_ttx(1, &signing_key_1); + let ttx_s1_2 = mock_ttx(2, &signing_key_1); + let ttx_s1_4 = mock_ttx(4, &signing_key_1); + + // add transactions + parked_txs.add(ttx_s0_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s0_2.clone(), 0).unwrap(); + parked_txs.add(ttx_s0_3.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_1.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_2.clone(), 0).unwrap(); + parked_txs.add(ttx_s1_4.clone(), 0).unwrap(); + + // current nonce getter + // should pop all from signing_address_0 and two from signing_address_1 + let current_account_nonce_getter = |address: [u8; 20]| async move { + if address == signing_address_0 || address == signing_address_1 { + return Ok(1); + } + Err(anyhow::anyhow!("invalid address")) + }; + + assert_eq!( + parked_txs + .find_promotables(¤t_account_nonce_getter) + .await + .len(), + 5, + "five transactions should've been popped" + ); + assert_eq!(parked_txs.txs.len(), 1, "empty accounts should be removed"); + assert_eq!( + parked_txs.len(), + 1, + "1 transactions should be remaining from original 6" + ); + assert!(parked_txs.contains_tx(&ttx_s1_4.tx_hash)); + } +} diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index a57706023b..0049267657 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -202,6 +202,7 @@ mod test { use std::{ collections::HashMap, str::FromStr, + sync::Arc, }; use astria_core::{ @@ -287,12 +288,12 @@ mod test { let (mut consensus_service, mempool) = new_consensus_service(Some(signing_key.verification_key())).await; let tx = make_unsigned_tx(); - let signed_tx = tx.into_signed(&signing_key); - let tx_bytes = signed_tx.clone().into_raw().encode_to_vec(); + let signed_tx = Arc::new(tx.into_signed(&signing_key)); + let tx_bytes = signed_tx.to_raw().encode_to_vec(); let txs = vec![tx_bytes.into()]; mempool.insert(signed_tx.clone(), 0).await.unwrap(); - let res = generate_rollup_datas_commitment(&vec![signed_tx], HashMap::new()); + let res = generate_rollup_datas_commitment(&vec![(*signed_tx).clone()], HashMap::new()); let prepare_proposal = new_prepare_proposal_request(); let prepare_proposal_response = consensus_service @@ -492,10 +493,10 @@ mod test { new_consensus_service(Some(signing_key.verification_key())).await; let tx = make_unsigned_tx(); - let signed_tx = tx.into_signed(&signing_key); - let tx_bytes = signed_tx.clone().into_raw().encode_to_vec(); + let signed_tx = Arc::new(tx.into_signed(&signing_key)); + let tx_bytes = signed_tx.to_raw().encode_to_vec(); let txs = vec![tx_bytes.clone().into()]; - let res = generate_rollup_datas_commitment(&vec![signed_tx.clone()], HashMap::new()); + let res = generate_rollup_datas_commitment(&vec![(*signed_tx).clone()], HashMap::new()); let block_data = res.into_transactions(txs.clone()); let data_hash = diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index 838578cbd4..33bd874207 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -1,5 +1,6 @@ use std::{ pin::Pin, + sync::Arc, task::{ Context, Poll, @@ -127,7 +128,6 @@ async fn handle_check_tx MAX_TX_SIZE { - mempool.remove(tx_hash).await; metrics.increment_check_tx_removed_too_large(); return response::CheckTx { code: Code::Err(AbciErrorCode::TRANSACTION_TOO_LARGE.value()), @@ -143,7 +143,6 @@ async fn handle_check_tx tx, Err(e) => { - mempool.remove(tx_hash).await; return response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), log: e.to_string(), @@ -155,7 +154,6 @@ async fn handle_check_tx tx, Err(e) => { - mempool.remove(tx_hash).await; return response::CheckTx { code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ @@ -173,7 +171,6 @@ async fn handle_check_tx { metrics.increment_check_tx_removed_expired(); @@ -257,6 +255,34 @@ async fn handle_check_tx { + return response::CheckTx { + code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), + info: "transaction removed from app mempool due to stale nonce".into(), + log: "Transaction from app mempool due to stale nonce".into(), + ..response::CheckTx::default() + }; + } + RemovalReason::LowerNonceInvalidated => { + return response::CheckTx { + code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), + info: "transaction removed from app mempool due to lower nonce being \ + invalidated" + .into(), + log: "Transaction removed from app mempool due to lower nonce being \ + invalidated" + .into(), + ..response::CheckTx::default() + }; + } + RemovalReason::FailedCheckTx(err) => { + return response::CheckTx { + code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), + info: "transaction failed check tx".into(), + log: format!("transaction failed check tx because: {err}"), + ..response::CheckTx::default() + }; + } } }; @@ -285,13 +311,18 @@ async fn handle_check_tx