diff --git a/crates/astria-core/src/primitive/v1/mod.rs b/crates/astria-core/src/primitive/v1/mod.rs index 13a392c19e..944d6e40bd 100644 --- a/crates/astria-core/src/primitive/v1/mod.rs +++ b/crates/astria-core/src/primitive/v1/mod.rs @@ -250,7 +250,7 @@ pub struct IncorrectAddressLength { received: usize, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize))] pub struct Address( #[cfg_attr(feature = "serde", serde(serialize_with = "crate::serde::base64"))] diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 49ac6bd444..31b9e49ce5 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -474,11 +474,12 @@ impl App { let mut execution_results = Vec::new(); let mut txs_to_readd_to_mempool = Vec::new(); - while let Some((tx, priority)) = self.mempool.pop().await { + while let Some((enqueued_tx, priority)) = self.mempool.pop().await { + let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string(); + let tx = enqueued_tx.signed_tx(); let bytes = tx.to_raw().encode_to_vec(); - let tx_hash = Sha256::digest(&bytes); let tx_len = bytes.len(); - info!(tx_hash = %telemetry::display::hex(&tx_hash), "executing transaction"); + info!(transaction_hash = %tx_hash_base64, "executing transaction"); // don't include tx if it would make the cometBFT block too large if !block_size_constraints.cometbft_has_space(tx_len) { @@ -487,12 +488,12 @@ impl App { ) .increment(1); debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), + transaction_hash = %tx_hash_base64, block_size_constraints = %json(&block_size_constraints), tx_data_bytes = tx_len, "excluding remaining transactions: max cometBFT data limit reached" ); - txs_to_readd_to_mempool.push((tx, priority)); + txs_to_readd_to_mempool.push((enqueued_tx, priority)); // break from loop, as the block is full break; @@ -508,12 +509,12 @@ impl App { if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), + transaction_hash = %tx_hash_base64, block_size_constraints = %json(&block_size_constraints), tx_data_bytes = tx_sequence_data_bytes, "excluding transaction: max block sequenced data limit reached" ); - txs_to_readd_to_mempool.push((tx, priority)); + txs_to_readd_to_mempool.push((enqueued_tx, priority)); // continue as there might be non-sequence txs that can fit continue; @@ -533,7 +534,7 @@ impl App { .cometbft_checked_add(tx_len) .context("error growing cometBFT block size")?; validated_txs.push(bytes.into()); - included_signed_txs.push(tx); + included_signed_txs.push((*tx).clone()); } Err(e) => { metrics::counter!( @@ -541,7 +542,7 @@ impl App { ) .increment(1); debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), + transaction_hash = %tx_hash_base64, error = AsRef::::as_ref(&e), "failed to execute transaction, not including in block" ); @@ -552,7 +553,7 @@ impl App { // if it's invalid due to the nonce being too low, it'll be // removed from the mempool in `update_mempool_after_finalization`. if e.downcast_ref::().is_some() { - txs_to_readd_to_mempool.push((tx, priority)); + txs_to_readd_to_mempool.push((enqueued_tx, priority)); } } } @@ -566,14 +567,7 @@ impl App { ); } - self.mempool - .insert_all(txs_to_readd_to_mempool) - .await - .expect( - "priority transaction nonce and transaction nonce matches for each tx, as they \ - were just popped from the mempool and not modified", - ); - + self.mempool.insert_all(txs_to_readd_to_mempool).await; let mempool_len = self.mempool.len().await; debug!(mempool_len, "finished executing transactions from mempool"); @@ -634,7 +628,7 @@ impl App { } // execute tx and store in `execution_results` list on success - match self.execute_transaction(tx.clone()).await { + match self.execute_transaction(Arc::new(tx.clone())).await { Ok(events) => { execution_results.push(ExecTxResult { events, @@ -799,12 +793,12 @@ impl App { for tx in finalize_block.txs.iter().skip(2) { // remove any included txs from the mempool let tx_hash = Sha256::digest(tx).into(); - self.mempool.remove(&tx_hash).await; + self.mempool.remove(tx_hash).await; let signed_tx = signed_transaction_from_bytes(tx) .context("protocol error; only valid txs should be finalized")?; - match self.execute_transaction(signed_tx).await { + match self.execute_transaction(Arc::new(signed_tx)).await { Ok(events) => tx_results.push(ExecTxResult { events, ..Default::default() @@ -972,8 +966,8 @@ impl App { ))] pub(crate) async fn execute_transaction( &mut self, - signed_tx: astria_core::protocol::transaction::v1alpha1::SignedTransaction, - ) -> anyhow::Result> { + signed_tx: Arc, + ) -> anyhow::Result> { let signed_tx_2 = signed_tx.clone(); let stateless = tokio::spawn(async move { transaction::check_stateless(&signed_tx_2).await }); @@ -1130,33 +1124,10 @@ async fn update_mempool_after_finalization( mempool: &mut Mempool, state: S, ) -> anyhow::Result<()> { - use crate::mempool::TransactionPriority; - - let mut txs_to_remove = Vec::new(); - - for (tx, priority) in mempool.write().await.iter_mut() { - match TransactionPriority::new( - tx.nonce(), - state - .get_account_nonce(*tx.verification_key().address()) - .await - .context("failed to fetch account nonce")?, - ) { - Ok(new_priority) => *priority = new_priority, - Err(e) => { - let tx_hash = tx.sha256_of_proto_encoding(); - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - error = AsRef::::as_ref(&e), - "account nonce is now greater than tx nonce; dropping tx from mempool", - ); - txs_to_remove.push(tx_hash); - } - }; - } - - mempool.remove_all(&txs_to_remove).await; - Ok(()) + let current_account_nonce_getter = |address: Address| state.get_account_nonce(address); + mempool + .update_priorities(current_account_nonce_getter) + .await } /// relevant data of a block being executed. diff --git a/crates/astria-sequencer/src/app/tests_app.rs b/crates/astria-sequencer/src/app/tests_app.rs index 5dbfc4f02a..9e718df325 100644 --- a/crates/astria-sequencer/src/app/tests_app.rs +++ b/crates/astria-sequencer/src/app/tests_app.rs @@ -52,7 +52,6 @@ use crate::{ StateWriteExt, }, genesis::Account, - mempool::TransactionPriority, proposal::commitment::generate_rollup_datas_commitment, state_ext::StateReadExt as _, }; @@ -463,10 +462,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() { // don't commit the result, now call prepare_proposal with the same data. // this will reset the app state. // this simulates executing the same block as a validator (specifically the proposer). - app.mempool - .insert(signed_tx, TransactionPriority::new(0, 0).unwrap()) - .await - .unwrap(); + app.mempool.insert(signed_tx, 0).await.unwrap(); let proposer_address = [88u8; 20].to_vec().try_into().unwrap(); let prepare_proposal = PrepareProposal { @@ -575,14 +571,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { } .into_signed(&alice_signing_key); - app.mempool - .insert(tx_pass, TransactionPriority::new(0, 0).unwrap()) - .await - .unwrap(); - app.mempool - .insert(tx_overflow, TransactionPriority::new(1, 0).unwrap()) - .await - .unwrap(); + app.mempool.insert(tx_pass, 0).await.unwrap(); + app.mempool.insert(tx_overflow, 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -654,14 +644,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { } .into_signed(&alice_signing_key); - app.mempool - .insert(tx_pass, TransactionPriority::new(0, 0).unwrap()) - .await - .unwrap(); - app.mempool - .insert(tx_overflow, TransactionPriority::new(1, 0).unwrap()) - .await - .unwrap(); + app.mempool.insert(tx_pass, 0).await.unwrap(); + app.mempool.insert(tx_overflow, 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -755,54 +739,3 @@ async fn app_end_block_validator_updates() { assert_eq!(validator_c.power, 100u32.into()); assert_eq!(app.state.get_validator_updates().await.unwrap().len(), 0); } - -#[tokio::test] -async fn update_mempool_after_finalization_update_account_nonce() { - let mut mempool = Mempool::new(); - - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - - // insert tx with nonce 1, account nonce is 0 - let tx = get_mock_tx(1); - let address = *tx.verification_key().address(); - let priority = TransactionPriority::new(1, 0).unwrap(); - mempool.insert(tx.clone(), priority).await.unwrap(); - - // update account nonce to 1 - let mut state_tx = StateDelta::new(snapshot.clone()); - state_tx.put_account_nonce(address, 1).unwrap(); - storage.commit(state_tx).await.unwrap(); - - // ensure that mempool tx priority was updated - update_mempool_after_finalization(&mut mempool, storage.latest_snapshot()) - .await - .unwrap(); - let (_, priority) = mempool.pop().await.unwrap(); - assert_eq!(priority, TransactionPriority::new(1, 1).unwrap()); -} - -#[tokio::test] -async fn update_mempool_after_finalization_remove_tx_if_nonce_too_low() { - let mut mempool = Mempool::new(); - - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - - // insert tx with nonce 1, account nonce is 1 - let tx = get_mock_tx(1); - let address = *tx.verification_key().address(); - let priority = TransactionPriority::new(1, 1).unwrap(); - mempool.insert(tx.clone(), priority).await.unwrap(); - - // update account nonce to 2 - let mut state_tx = StateDelta::new(snapshot.clone()); - state_tx.put_account_nonce(address, 2).unwrap(); - storage.commit(state_tx).await.unwrap(); - - // ensure that tx was removed from mempool - update_mempool_after_finalization(&mut mempool, storage.latest_snapshot()) - .await - .unwrap(); - assert!(mempool.pop().await.is_none()); -} diff --git a/crates/astria-sequencer/src/app/tests_breaking_changes.rs b/crates/astria-sequencer/src/app/tests_breaking_changes.rs index 1cf2232902..cccf427a2b 100644 --- a/crates/astria-sequencer/src/app/tests_breaking_changes.rs +++ b/crates/astria-sequencer/src/app/tests_breaking_changes.rs @@ -8,7 +8,11 @@ //! Note: there are two actions not tested here: `Ics20Withdrawal` and `IbcRelay`. //! These are due to the extensive setup needed to test them. //! If changes are made to the execution results of these actions, manual testing is required. -use std::collections::HashMap; + +use std::{ + collections::HashMap, + sync::Arc, +}; use astria_core::{ primitive::v1::{ @@ -240,7 +244,7 @@ async fn app_execute_transaction_with_every_action_snapshot() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); // execute BridgeUnlock action @@ -260,7 +264,7 @@ async fn app_execute_transaction_with_every_action_snapshot() { ], }; - let signed_tx = tx.into_signed(&bridge_signing_key); + let signed_tx = Arc::new(tx.into_signed(&bridge_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); app.prepare_commit(storage.clone()).await.unwrap(); diff --git a/crates/astria-sequencer/src/app/tests_execute_transaction.rs b/crates/astria-sequencer/src/app/tests_execute_transaction.rs index ca9c891f6d..96fd92bc51 100644 --- a/crates/astria-sequencer/src/app/tests_execute_transaction.rs +++ b/crates/astria-sequencer/src/app/tests_execute_transaction.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + #[cfg(feature = "mint")] use astria_core::protocol::transaction::v1alpha1::action::MintAction; use astria_core::{ @@ -69,7 +71,7 @@ async fn app_execute_transaction_transfer() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); let native_asset = get_native_asset().id(); @@ -126,7 +128,7 @@ async fn app_execute_transaction_transfer_not_native_token() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); let native_asset = get_native_asset().id(); @@ -192,7 +194,7 @@ async fn app_execute_transaction_transfer_balance_too_low_for_fee() { ], }; - let signed_tx = tx.into_signed(&keypair); + let signed_tx = Arc::new(tx.into_signed(&keypair)); let res = app .execute_transaction(signed_tx) .await @@ -231,7 +233,7 @@ async fn app_execute_transaction_sequence() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); @@ -268,7 +270,7 @@ async fn app_execute_transaction_invalid_fee_asset() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); assert!(app.execute_transaction(signed_tx).await.is_err()); } @@ -302,7 +304,7 @@ async fn app_execute_transaction_validator_update() { actions: vec![Action::ValidatorUpdate(update.clone())], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); @@ -335,7 +337,7 @@ async fn app_execute_transaction_ibc_relayer_change_addition() { actions: vec![IbcRelayerChangeAction::Addition(alice_address).into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); assert!(app.state.is_ibc_relayer(&alice_address).await.unwrap()); @@ -365,7 +367,7 @@ async fn app_execute_transaction_ibc_relayer_change_deletion() { actions: vec![IbcRelayerChangeAction::Removal(alice_address).into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); assert!(!app.state.is_ibc_relayer(&alice_address).await.unwrap()); @@ -395,7 +397,7 @@ async fn app_execute_transaction_ibc_relayer_change_invalid() { actions: vec![IbcRelayerChangeAction::Removal(alice_address).into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); assert!(app.execute_transaction(signed_tx).await.is_err()); } @@ -427,7 +429,7 @@ async fn app_execute_transaction_sudo_address_change() { })], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); @@ -462,7 +464,7 @@ async fn app_execute_transaction_sudo_address_change_error() { })], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); let res = app .execute_transaction(signed_tx) .await @@ -502,7 +504,7 @@ async fn app_execute_transaction_fee_asset_change_addition() { ))], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); @@ -541,7 +543,7 @@ async fn app_execute_transaction_fee_asset_change_removal() { ))], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); @@ -581,7 +583,7 @@ async fn app_execute_transaction_fee_asset_change_invalid() { ))], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); let res = app .execute_transaction(signed_tx) .await @@ -617,7 +619,7 @@ async fn app_execute_transaction_init_bridge_account_ok() { actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); let before_balance = app .state @@ -672,7 +674,7 @@ async fn app_execute_transaction_init_bridge_account_account_already_registered( actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); let action = InitBridgeAccountAction { @@ -688,7 +690,7 @@ async fn app_execute_transaction_init_bridge_account_account_already_registered( actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); assert!(app.execute_transaction(signed_tx).await.is_err()); } @@ -724,7 +726,7 @@ async fn app_execute_transaction_bridge_lock_action_ok() { actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); let alice_before_balance = app .state @@ -802,7 +804,7 @@ async fn app_execute_transaction_bridge_lock_action_invalid_for_eoa() { actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); assert!(app.execute_transaction(signed_tx).await.is_err()); } @@ -839,7 +841,7 @@ async fn app_execute_transaction_mint() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!( @@ -876,7 +878,7 @@ async fn app_execute_transaction_invalid_nonce() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); let response = app.execute_transaction(signed_tx).await; // check that tx was not executed by checking nonce and balance are unchanged @@ -922,7 +924,7 @@ async fn app_execute_transaction_invalid_chain_id() { ], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); let response = app.execute_transaction(signed_tx).await; // check that tx was not executed by checking nonce and balance are unchanged @@ -984,7 +986,7 @@ async fn app_stateful_check_fails_insufficient_total_balance() { .into_signed(&alice_signing_key); // make transfer - app.execute_transaction(signed_tx).await.unwrap(); + app.execute_transaction(Arc::new(signed_tx)).await.unwrap(); // build double transfer exceeding balance let signed_tx_fail = UnsignedTransaction { @@ -1081,7 +1083,7 @@ async fn app_execute_transaction_bridge_lock_unlock_action_ok() { actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&alice_signing_key); + let signed_tx = Arc::new(tx.into_signed(&alice_signing_key)); app.execute_transaction(signed_tx).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); @@ -1102,7 +1104,7 @@ async fn app_execute_transaction_bridge_lock_unlock_action_ok() { actions: vec![action.into()], }; - let signed_tx = tx.into_signed(&bridge_signing_key); + let signed_tx = Arc::new(tx.into_signed(&bridge_signing_key)); app.execute_transaction(signed_tx) .await .expect("executing bridge unlock action should succeed"); diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index da60cb43eb..f8fb17e0c4 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -221,7 +221,6 @@ mod test { use super::*; use crate::{ api_state_ext::StateWriteExt as _, - mempool::TransactionPriority, state_ext::StateWriteExt, }; @@ -255,19 +254,17 @@ mod test { #[tokio::test] async fn get_pending_nonce_in_mempool() { let storage = cnidarium::TempStorage::new().await.unwrap(); - let mut mempool = Mempool::new(); + let mempool = Mempool::new(); let (_, address) = crate::app::test_utils::get_alice_signing_key_and_address(); let nonce = 99; let tx = crate::app::test_utils::get_mock_tx(nonce); - let priority = TransactionPriority::new(nonce, 0).unwrap(); - mempool.insert(tx, priority).await.unwrap(); + mempool.insert(tx, 0).await.unwrap(); // insert a tx with lower nonce also, but we should get the highest nonce let lower_nonce = 98; let tx = crate::app::test_utils::get_mock_tx(lower_nonce); - let priority = TransactionPriority::new(lower_nonce, 0).unwrap(); - mempool.insert(tx, priority).await.unwrap(); + mempool.insert(tx, 0).await.unwrap(); let server = Arc::new(SequencerServer::new(storage.clone(), mempool)); let request = GetPendingNonceRequest { diff --git a/crates/astria-sequencer/src/mempool.rs b/crates/astria-sequencer/src/mempool.rs index 0795dba77c..5cd5addc6e 100644 --- a/crates/astria-sequencer/src/mempool.rs +++ b/crates/astria-sequencer/src/mempool.rs @@ -1,39 +1,54 @@ use std::{ - cmp::Ordering, + cmp::{ + self, + Ordering, + }, collections::HashMap, - sync::Arc, + future::Future, + sync::{ + Arc, + OnceLock, + }, }; +use anyhow::Context; use astria_core::{ + crypto::SigningKey, primitive::v1::Address, - protocol::transaction::v1alpha1::SignedTransaction, + protocol::transaction::v1alpha1::{ + SignedTransaction, + TransactionParams, + UnsignedTransaction, + }, }; -use priority_queue::double_priority_queue::DoublePriorityQueue; +use priority_queue::PriorityQueue; use tokio::sync::RwLock; -use tracing::instrument; +use tracing::debug; + +type MempoolQueue = PriorityQueue; /// Used to prioritize transactions in the mempool. /// /// The priority is calculated as the difference between the transaction nonce and the current /// account nonce. The lower the difference, the higher the priority. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Clone, Debug)] pub(crate) struct TransactionPriority { - transaction_nonce: u32, - current_account_nonce: u32, + nonce_diff: u32, } -impl TransactionPriority { - fn nonce_diff(&self) -> u32 { - self.transaction_nonce - self.current_account_nonce +impl PartialEq for TransactionPriority { + fn eq(&self, other: &Self) -> bool { + self.nonce_diff == other.nonce_diff } } +impl Eq for TransactionPriority {} + impl Ord for TransactionPriority { - #[allow(clippy::non_canonical_partial_ord_impl)] fn cmp(&self, other: &Self) -> Ordering { // we want to execute the lowest nonce first, // so lower nonce difference means higher priority - self.nonce_diff().cmp(&other.nonce_diff()).reverse() + self.nonce_diff.cmp(&other.nonce_diff).reverse() } } @@ -43,212 +58,232 @@ impl PartialOrd for TransactionPriority { } } -impl TransactionPriority { - pub(crate) fn new(transaction_nonce: u32, current_account_nonce: u32) -> anyhow::Result { - if transaction_nonce < current_account_nonce { +#[derive(Clone, Debug)] +pub(crate) struct EnqueuedTransaction { + tx_hash: [u8; 32], + signed_tx: Arc, +} + +impl EnqueuedTransaction { + fn new(signed_tx: SignedTransaction) -> Self { + Self { + tx_hash: signed_tx.sha256_of_proto_encoding(), + signed_tx: Arc::new(signed_tx), + } + } + + fn priority(&self, current_account_nonce: u32) -> anyhow::Result { + let Some(nonce_diff) = self.signed_tx.nonce().checked_sub(current_account_nonce) else { return Err(anyhow::anyhow!( - "transaction nonce {} is less than current account nonce {}", - transaction_nonce, - current_account_nonce + "transaction nonce {} is less than current account nonce {current_account_nonce}", + self.signed_tx.nonce() )); - } + }; - Ok(Self { - transaction_nonce, - current_account_nonce, + Ok(TransactionPriority { + nonce_diff, }) } -} -/// [`BasicMempool`] is a simple mempool implementation that stores transactions in a priority queue -/// ordered by nonce. -/// -/// Future extensions to this mempool can include: -/// - maximum mempool size -/// - fee-based ordering -/// - transaction expiration -pub(crate) struct BasicMempool { - queue: DoublePriorityQueue<[u8; 32], TransactionPriority>, - hash_to_tx: HashMap<[u8; 32], SignedTransaction>, -} + pub(crate) fn tx_hash(&self) -> [u8; 32] { + self.tx_hash + } -impl BasicMempool { - #[must_use] - fn new() -> Self { - Self { - queue: DoublePriorityQueue::new(), - hash_to_tx: HashMap::new(), - } + pub(crate) fn signed_tx(&self) -> Arc { + self.signed_tx.clone() } - #[must_use] - pub(crate) fn iter(&self) -> BasicMempoolIter { - BasicMempoolIter { - iter: self.queue.iter(), - hash_to_tx: &self.hash_to_tx, - } + pub(crate) fn address(&self) -> &Address { + self.signed_tx.verification_key().address() } +} - #[must_use] - pub(crate) fn iter_mut(&mut self) -> BasicMempoolIterMut { - BasicMempoolIterMut { - iter: self.queue.iter_mut(), - hash_to_tx: &mut self.hash_to_tx, - } +/// Only consider `self.tx_hash` for equality. This is consistent with the impl for std `Hash`. +impl PartialEq for EnqueuedTransaction { + fn eq(&self, other: &Self) -> bool { + self.tx_hash == other.tx_hash + } +} + +impl Eq for EnqueuedTransaction {} + +/// Only consider `self.tx_hash` when hashing. This is consistent with the impl for equality. +impl std::hash::Hash for EnqueuedTransaction { + fn hash(&self, state: &mut H) { + self.tx_hash.hash(state); } } -/// [`Mempool`] is a wrapper around [`BasicMempool`] that isolates the -/// locking mechanism for the mempool. +/// [`Mempool`] is an internally-synchronized wrapper around a prioritized queue of transactions +/// awaiting execution. +/// +/// The priority is calculated as the difference between the transaction nonce and the current +/// account nonce. The lower the difference, the higher the priority. +/// +/// Future extensions to this mempool can include: +/// - maximum mempool size +/// - fee-based ordering +/// - transaction expiration #[derive(Clone)] pub(crate) struct Mempool { - inner: Arc>, + inner: Arc>, } impl Mempool { #[must_use] pub(crate) fn new() -> Self { Self { - inner: Arc::new(RwLock::new(BasicMempool::new())), + inner: Arc::new(RwLock::new(MempoolQueue::new())), } } /// returns the number of transactions in the mempool #[must_use] pub(crate) async fn len(&self) -> usize { - let inner = self.inner.read().await; - inner.queue.len() + self.inner.read().await.len() } /// inserts a transaction into the mempool /// /// note: if the tx already exists in the mempool, it's overwritten with the new priority. pub(crate) async fn insert( - &mut self, + &self, tx: SignedTransaction, - priority: TransactionPriority, + current_account_nonce: u32, ) -> anyhow::Result<()> { - if tx.nonce() != priority.transaction_nonce { - anyhow::bail!("transaction nonce does not match `transaction_nonce` in priority"); - } - - let hash = tx.sha256_of_proto_encoding(); - let mut inner = self.inner.write().await; - inner.queue.push(hash, priority); - inner.hash_to_tx.insert(hash, tx); - tracing::trace!(tx_hash = %telemetry::display::hex(hash.as_ref()), "inserted transaction into mempool"); + let enqueued_tx = EnqueuedTransaction::new(tx); + let priority = enqueued_tx.priority(current_account_nonce)?; + let tx_hash = enqueued_tx.tx_hash; + self.inner.write().await.push(enqueued_tx, priority); + tracing::trace!( + tx_hash = %telemetry::display::hex(&tx_hash), + "inserted transaction into mempool" + ); Ok(()) } /// inserts all the given transactions into the mempool - pub(crate) async fn insert_all( - &mut self, - txs: Vec<(SignedTransaction, TransactionPriority)>, - ) -> anyhow::Result<()> { - for (tx, priority) in txs { - self.insert(tx, priority).await?; - } - Ok(()) + pub(crate) async fn insert_all(&self, txs: Vec<(EnqueuedTransaction, TransactionPriority)>) { + self.inner.write().await.extend(txs); } /// pops the transaction with the highest priority from the mempool #[must_use] - pub(crate) async fn pop(&mut self) -> Option<(SignedTransaction, TransactionPriority)> { - let mut inner = self.inner.write().await; - let (hash, priority) = inner.queue.pop_max()?; - let tx = inner.hash_to_tx.remove(&hash)?; - Some((tx, priority)) + pub(crate) async fn pop(&self) -> Option<(EnqueuedTransaction, TransactionPriority)> { + self.inner.write().await.pop() } /// removes a transaction from the mempool - pub(crate) async fn remove(&mut self, tx_hash: &[u8; 32]) { - let mut inner = self.inner.write().await; - inner.queue.remove(tx_hash); - inner.hash_to_tx.remove(tx_hash); + pub(crate) async fn remove(&self, tx_hash: [u8; 32]) { + let enqueued_tx = EnqueuedTransaction { + tx_hash, + signed_tx: dummy_signed_tx().clone(), + }; + self.inner.write().await.remove(&enqueued_tx); } - /// removes all the given transactions from the mempool - pub(crate) async fn remove_all(&mut self, tx_hashes: &[[u8; 32]]) { - for tx_hash in tx_hashes { - self.remove(tx_hash).await; + /// Updates the priority of the txs in the mempool based on the current state, and removes any + /// that are now invalid. + /// + /// *NOTE*: this function locks the mempool until every tx has been checked. This could + /// potentially stall consensus from moving to the next round if the mempool is large. + pub(crate) async fn update_priorities( + &self, + current_account_nonce_getter: F, + ) -> anyhow::Result<()> + where + F: Fn(Address) -> O, + O: Future>, + { + let mut txs_to_remove = Vec::new(); + let mut current_account_nonces = HashMap::new(); + + let mut queue = self.inner.write().await; + for (enqueued_tx, priority) in queue.iter_mut() { + let address = enqueued_tx.address(); + // Try to get the current account nonce from the ones already retrieved. + let current_account_nonce = if let Some(nonce) = current_account_nonces.get(&address) { + *nonce + } else { + // Fall back to getting via the getter and adding it to the local temp collection. + let nonce = current_account_nonce_getter(*enqueued_tx.address()) + .await + .context("failed to fetch account nonce")?; + current_account_nonces.insert(address, nonce); + nonce + }; + match enqueued_tx.priority(current_account_nonce) { + Ok(new_priority) => *priority = new_priority, + Err(e) => { + debug!( + transaction_hash = %telemetry::display::base64(&enqueued_tx.tx_hash), + error = AsRef::::as_ref(&e), + "account nonce is now greater than tx nonce; dropping tx from mempool", + ); + txs_to_remove.push(enqueued_tx.clone()); + } + }; } - } - /// returns the inner mempool, write-locked. - /// required so that `BasicMempool::iter_mut()` can be called. - pub(crate) async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, BasicMempool> { - self.inner.write().await + for enqueued_tx in txs_to_remove { + queue.remove(&enqueued_tx); + } + + Ok(()) } /// returns the pending nonce for the given address, /// if it exists in the mempool. - #[instrument(skip(self), fields(address = %address))] pub(crate) async fn pending_nonce(&self, address: &Address) -> Option { let inner = self.inner.read().await; let mut nonce = None; - for (tx, priority) in inner.iter() { - let sender = tx.verification_key().address(); - if sender == address { - nonce = Some(std::cmp::max( - nonce.unwrap_or_default(), - priority.transaction_nonce, - )); + for (tx, _priority) in inner.iter() { + if tx.address() == address { + nonce = Some(cmp::max(nonce.unwrap_or_default(), tx.signed_tx.nonce())); } } nonce } } -pub(crate) struct BasicMempoolIter<'a> { - iter: priority_queue::core_iterators::Iter<'a, [u8; 32], TransactionPriority>, - hash_to_tx: &'a HashMap<[u8; 32], SignedTransaction>, -} - -impl<'a> Iterator for BasicMempoolIter<'a> { - type Item = (&'a SignedTransaction, &'a TransactionPriority); - - fn next(&mut self) -> Option { - self.iter.next().map(|(hash, priority)| { - let tx = self - .hash_to_tx - .get(hash) - .expect("hash in queue must be in hash_to_tx"); - (tx, priority) - }) - } -} - -pub(crate) struct BasicMempoolIterMut<'a> { - iter: priority_queue::double_priority_queue::iterators::IterMut< - 'a, - [u8; 32], - TransactionPriority, - >, - hash_to_tx: &'a HashMap<[u8; 32], SignedTransaction>, -} - -impl<'a> Iterator for BasicMempoolIterMut<'a> { - type Item = (&'a SignedTransaction, &'a mut TransactionPriority); - - fn next(&mut self) -> Option { - self.iter.next().map(|(hash, priority)| { - let tx = self - .hash_to_tx - .get(hash) - .expect("hash in queue must be in hash_to_tx"); - (tx, priority) - }) - } +/// This exists to provide a `SignedTransaction` for the purposes of removing an entry from the +/// queue where we only have the tx hash available. +/// +/// The queue is indexed by `EnqueuedTransaction` which internally needs a `SignedTransaction`, but +/// this `signed_tx` field is ignored in the `PartialEq` and `Hash` impls of `EnqueuedTransaction` - +/// only the tx hash is considered. So we create an `EnqueuedTransaction` on the fly with the +/// correct tx hash and this dummy signed tx when removing from the queue. +fn dummy_signed_tx() -> &'static Arc { + static TX: OnceLock> = OnceLock::new(); + TX.get_or_init(|| { + let actions = vec![]; + let params = TransactionParams { + nonce: 0, + chain_id: String::new(), + }; + let signing_key = SigningKey::from([0; 32]); + let unsigned_tx = UnsignedTransaction { + actions, + params, + }; + Arc::new(unsigned_tx.into_signed(&signing_key)) + }) } #[cfg(test)] mod test { + use std::hash::{ + Hash, + Hasher, + }; + use super::*; use crate::app::test_utils::get_mock_tx; #[test] - fn transaction_priority_invalid() { - let priority = TransactionPriority::new(0, 1); + fn transaction_priority_should_error_if_invalid() { + let enqueued_tx = EnqueuedTransaction::new(get_mock_tx(0)); + let priority = enqueued_tx.priority(1); assert!( priority .unwrap_err() @@ -257,55 +292,261 @@ mod test { ); } + // From https://doc.rust-lang.org/std/cmp/trait.PartialOrd.html #[test] - fn mempool_nonce_priority() { - let priority_0 = TransactionPriority { - transaction_nonce: 0, - current_account_nonce: 0, + // allow: we want explicit assertions here to match the documented expected behavior. + #[allow(clippy::nonminimal_bool)] + fn transaction_priority_comparisons_should_be_consistent() { + let high = TransactionPriority { + nonce_diff: 0, }; - let priority_1 = TransactionPriority { - transaction_nonce: 1, - current_account_nonce: 0, + let low = TransactionPriority { + nonce_diff: 1, }; - assert!(priority_0 > priority_1); - assert!(priority_0 == priority_0); - assert!(priority_1 < priority_0); + assert!(high.partial_cmp(&high) == Some(Ordering::Equal)); + assert!(high.partial_cmp(&low) == Some(Ordering::Greater)); + assert!(low.partial_cmp(&high) == Some(Ordering::Less)); + + // 1. a == b if and only if partial_cmp(a, b) == Some(Equal) + assert!(high == high); // Some(Equal) + assert!(!(high == low)); // Some(Greater) + assert!(!(low == high)); // Some(Less) + + // 2. a < b if and only if partial_cmp(a, b) == Some(Less) + assert!(low < high); // Some(Less) + assert!(!(high < high)); // Some(Equal) + assert!(!(high < low)); // Some(Greater) + + // 3. a > b if and only if partial_cmp(a, b) == Some(Greater) + assert!(high > low); // Some(Greater) + assert!(!(high > high)); // Some(Equal) + assert!(!(low > high)); // Some(Less) + + // 4. a <= b if and only if a < b || a == b + assert!(low <= high); // a < b + assert!(high <= high); // a == b + assert!(!(high <= low)); // a > b + + // 5. a >= b if and only if a > b || a == b + assert!(high >= low); // a > b + assert!(high >= high); // a == b + assert!(!(low >= high)); // a < b + + // 6. a != b if and only if !(a == b) + assert!(high != low); // asserted !(high == low) above + assert!(low != high); // asserted !(low == high) above + assert!(!(high != high)); // asserted high == high above + } + + #[test] + // From https://doc.rust-lang.org/std/hash/trait.Hash.html#hash-and-eq + fn enqueued_tx_hash_and_eq_should_be_consistent() { + // Check enqueued txs compare equal if and only if their tx hashes are equal. + let tx0 = EnqueuedTransaction { + tx_hash: [0; 32], + signed_tx: Arc::new(get_mock_tx(0)), + }; + let other_tx0 = EnqueuedTransaction { + tx_hash: [0; 32], + signed_tx: Arc::new(get_mock_tx(1)), + }; + let tx1 = EnqueuedTransaction { + tx_hash: [1; 32], + signed_tx: Arc::new(get_mock_tx(0)), + }; + assert!(tx0 == other_tx0); + assert!(tx0 != tx1); + + // Check enqueued txs' std hashes compare equal if and only if their tx hashes are equal. + let std_hash = |enqueued_tx: &EnqueuedTransaction| -> u64 { + let mut hasher = std::hash::DefaultHasher::new(); + enqueued_tx.hash(&mut hasher); + hasher.finish() + }; + assert!(std_hash(&tx0) == std_hash(&other_tx0)); + assert!(std_hash(&tx0) != std_hash(&tx1)); } #[tokio::test] - async fn mempool_insert_pop() { - let mut mempool = Mempool::new(); + async fn should_insert_and_pop() { + let mempool = Mempool::new(); + // Priority 0 (highest priority). let tx0 = get_mock_tx(0); - let priority0 = TransactionPriority::new(0, 0).unwrap(); - mempool - .insert(tx0.clone(), priority0.clone()) - .await - .unwrap(); + mempool.insert(tx0.clone(), 0).await.unwrap(); + // Priority 1. let tx1 = get_mock_tx(1); - let priority1 = TransactionPriority::new(1, 0).unwrap(); + mempool.insert(tx1.clone(), 0).await.unwrap(); + + assert_eq!(mempool.len().await, 2); + + // Should pop priority 0 first. + let (tx, priority) = mempool.pop().await.unwrap(); + assert_eq!( + tx.signed_tx.sha256_of_proto_encoding(), + tx0.sha256_of_proto_encoding() + ); + assert_eq!(priority.nonce_diff, 0); + assert_eq!(mempool.len().await, 1); + + // Should pop priority 1 second. + let (tx, priority) = mempool.pop().await.unwrap(); + assert_eq!( + tx.signed_tx.sha256_of_proto_encoding(), + tx1.sha256_of_proto_encoding() + ); + assert_eq!(priority.nonce_diff, 1); + assert_eq!(mempool.len().await, 0); + } + + #[tokio::test] + async fn should_remove() { + let mempool = Mempool::new(); + let tx_count = 5_usize; + + let current_account_nonce = 0; + let txs: Vec<_> = (0..tx_count) + .map(|index| { + let enqueued_tx = + EnqueuedTransaction::new(get_mock_tx(u32::try_from(index).unwrap())); + let priority = enqueued_tx.priority(current_account_nonce).unwrap(); + (enqueued_tx, priority) + }) + .collect(); + mempool.insert_all(txs.clone()).await; + assert_eq!(mempool.len().await, tx_count); + + // Remove the last tx. + let last_tx_hash = txs.last().unwrap().0.tx_hash; + mempool.remove(last_tx_hash).await; + let mut expected_remaining_count = tx_count.checked_sub(1).unwrap(); + assert_eq!(mempool.len().await, expected_remaining_count); + + // Removing it again should have no effect. + mempool.remove(last_tx_hash).await; + assert_eq!(mempool.len().await, expected_remaining_count); + + // Remove the first tx. + mempool.remove(txs.first().unwrap().0.tx_hash).await; + expected_remaining_count = expected_remaining_count.checked_sub(1).unwrap(); + assert_eq!(mempool.len().await, expected_remaining_count); + + // Check the next tx popped is the second priority. + let (tx, priority) = mempool.pop().await.unwrap(); + assert_eq!(tx.tx_hash, txs[1].0.tx_hash()); + assert_eq!(priority.nonce_diff, 1); + } + + #[tokio::test] + async fn should_update_priorities() { + let mempool = Mempool::new(); + + // Insert txs signed by alice with nonces 0 and 1. + mempool.insert(get_mock_tx(0), 0).await.unwrap(); + mempool.insert(get_mock_tx(1), 0).await.unwrap(); + + // Insert txs from a different signer with nonces 100 and 102. + let other_signing_key = SigningKey::from([1; 32]); + let other_mock_tx = |nonce: u32| -> SignedTransaction { + let actions = get_mock_tx(0).actions().to_vec(); + UnsignedTransaction { + params: TransactionParams { + nonce, + chain_id: "test".to_string(), + }, + actions, + } + .into_signed(&other_signing_key) + }; + mempool.insert(other_mock_tx(100), 0).await.unwrap(); + mempool.insert(other_mock_tx(102), 0).await.unwrap(); + + assert_eq!(mempool.len().await, 4); + + let (alice_signing_key, alice_address) = + crate::app::test_utils::get_alice_signing_key_and_address(); + let other_address = *other_signing_key.verification_key().address(); + + // Create a getter fn which will returns 1 for alice's current account nonce, and 101 for + // the other signer's. + let current_account_nonce_getter = |address: Address| async move { + if address == alice_address { + return Ok(1); + } + if address == other_address { + return Ok(101); + } + Err(anyhow::anyhow!("invalid address")) + }; + + // Update the priorities. Alice's first tx (with nonce 0) and other's first (with nonce + // 100) should both get purged. mempool - .insert(tx1.clone(), priority1.clone()) + .update_priorities(current_account_nonce_getter) .await .unwrap(); - assert!(priority0 > priority1); assert_eq!(mempool.len().await, 2); + // Alice's remaining tx should be the highest priority (nonce diff of 1 - 1 == 0). let (tx, priority) = mempool.pop().await.unwrap(); + assert_eq!(tx.signed_tx.nonce(), 1); assert_eq!( - tx.sha256_of_proto_encoding(), - tx0.sha256_of_proto_encoding() + *tx.signed_tx.verification_key(), + alice_signing_key.verification_key() ); - assert_eq!(priority, priority0); + assert_eq!(priority.nonce_diff, 0); + // Other's remaining tx should be the highest priority (nonce diff of 102 - 101 == 1). let (tx, priority) = mempool.pop().await.unwrap(); + assert_eq!(tx.signed_tx.nonce(), 102); assert_eq!( - tx.sha256_of_proto_encoding(), - tx1.sha256_of_proto_encoding() + *tx.signed_tx.verification_key(), + other_signing_key.verification_key() + ); + assert_eq!(priority.nonce_diff, 1); + } + + #[tokio::test] + async fn should_get_pending_nonce() { + let mempool = Mempool::new(); + + // Insert txs signed by alice with nonces 0 and 1. + mempool.insert(get_mock_tx(0), 0).await.unwrap(); + mempool.insert(get_mock_tx(1), 0).await.unwrap(); + + // Insert txs from a different signer with nonces 100 and 101. + let other_signing_key = SigningKey::from([1; 32]); + let other_mock_tx = |nonce: u32| -> SignedTransaction { + let actions = get_mock_tx(0).actions().to_vec(); + UnsignedTransaction { + params: TransactionParams { + nonce, + chain_id: "test".to_string(), + }, + actions, + } + .into_signed(&other_signing_key) + }; + mempool.insert(other_mock_tx(100), 0).await.unwrap(); + mempool.insert(other_mock_tx(101), 0).await.unwrap(); + + assert_eq!(mempool.len().await, 4); + + // Check the pending nonce for alice is 1 and for the other signer is 101. + let alice_address = crate::app::test_utils::get_alice_signing_key_and_address().1; + assert_eq!(mempool.pending_nonce(&alice_address).await.unwrap(), 1); + let other_address = *other_signing_key.verification_key().address(); + assert_eq!(mempool.pending_nonce(&other_address).await.unwrap(), 101); + + // Check the pending nonce for an address with no enqueued txs is `None`. + assert!( + mempool + .pending_nonce(&Address::from([1; 20])) + .await + .is_none() ); - assert_eq!(priority, priority1); } } diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 55156fe592..993cf74d98 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -247,10 +247,7 @@ mod test { use crate::{ app::test_utils::default_fees, asset::get_native_asset, - mempool::{ - Mempool, - TransactionPriority, - }, + mempool::Mempool, proposal::commitment::generate_rollup_datas_commitment, }; @@ -300,14 +297,13 @@ mod test { #[tokio::test] async fn prepare_and_process_proposal() { let signing_key = SigningKey::new(OsRng); - let (mut consensus_service, mut mempool) = + let (mut consensus_service, mempool) = new_consensus_service(Some(signing_key.verification_key())).await; let tx = make_unsigned_tx(); let signed_tx = tx.into_signed(&signing_key); let tx_bytes = signed_tx.clone().into_raw().encode_to_vec(); let txs = vec![tx_bytes.into()]; - let priority = TransactionPriority::new(0, 0).unwrap(); - mempool.insert(signed_tx.clone(), priority).await.unwrap(); + mempool.insert(signed_tx.clone(), 0).await.unwrap(); let res = generate_rollup_datas_commitment(&vec![signed_tx], HashMap::new()); @@ -508,7 +504,7 @@ mod test { use sha2::Digest as _; let signing_key = SigningKey::new(OsRng); - let (mut consensus_service, mut mempool) = + let (mut consensus_service, mempool) = new_consensus_service(Some(signing_key.verification_key())).await; let tx = make_unsigned_tx(); @@ -529,10 +525,7 @@ mod test { .await .unwrap(); - mempool - .insert(signed_tx, TransactionPriority::new(0, 0).unwrap()) - .await - .unwrap(); + mempool.insert(signed_tx, 0).await.unwrap(); let finalize_block = request::FinalizeBlock { hash: Hash::try_from([0u8; 32].to_vec()).unwrap(), height: 1u32.into(), diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index 569f65c3fc..b688bf376a 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -104,7 +104,7 @@ async fn handle_check_tx( tx, .. } = req; if tx.len() > MAX_TX_SIZE { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; metrics::counter!(metrics_init::CHECK_TX_REMOVED_TOO_LARGE).increment(1); return response::CheckTx { code: AbciErrorCode::TRANSACTION_TOO_LARGE.into(), @@ -120,7 +120,7 @@ async fn handle_check_tx( let raw_signed_tx = match raw::SignedTransaction::decode(tx) { Ok(tx) => tx, Err(e) => { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; return response::CheckTx { code: AbciErrorCode::INVALID_PARAMETER.into(), log: e.to_string(), @@ -132,7 +132,7 @@ async fn handle_check_tx( let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { Ok(tx) => tx, Err(e) => { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; return response::CheckTx { code: AbciErrorCode::INVALID_PARAMETER.into(), info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ @@ -145,7 +145,7 @@ async fn handle_check_tx( }; if let Err(e) = transaction::check_stateless(&signed_tx).await { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; metrics::counter!(metrics_init::CHECK_TX_REMOVED_FAILED_STATELESS).increment(1); return response::CheckTx { code: AbciErrorCode::INVALID_PARAMETER.into(), @@ -156,7 +156,7 @@ async fn handle_check_tx( }; if let Err(e) = transaction::check_nonce_mempool(&signed_tx, &state).await { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; metrics::counter!(metrics_init::CHECK_TX_REMOVED_STALE_NONCE).increment(1); return response::CheckTx { code: AbciErrorCode::INVALID_NONCE.into(), @@ -167,7 +167,7 @@ async fn handle_check_tx( }; if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, &state).await { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; return response::CheckTx { code: AbciErrorCode::INVALID_CHAIN_ID.into(), info: "failed verifying chain id".into(), @@ -177,7 +177,7 @@ async fn handle_check_tx( } if let Err(e) = transaction::check_balance_mempool(&signed_tx, &state).await { - mempool.remove(&tx_hash).await; + mempool.remove(tx_hash).await; metrics::counter!(metrics_init::CHECK_TX_REMOVED_ACCOUNT_BALANCE).increment(1); return response::CheckTx { code: AbciErrorCode::INSUFFICIENT_FUNDS.into(), @@ -188,20 +188,17 @@ async fn handle_check_tx( }; // tx is valid, push to mempool - let priority = crate::mempool::TransactionPriority::new( - signed_tx.nonce(), - state - .get_account_nonce(*signed_tx.verification_key().address()) - .await - .expect("can fetch account nonce"), - ) - .expect( - "tx nonce is greater or equal to current account nonce; this was checked in \ - check_nonce_mempool", - ); + let current_account_nonce = state + .get_account_nonce(*signed_tx.verification_key().address()) + .await + .expect("can fetch account nonce"); + mempool - .insert(signed_tx, priority) + .insert(signed_tx, current_account_nonce) .await - .expect("priority transaction nonce and transaction nonce match, as we set them above"); + .expect( + "tx nonce is greater than or equal to current account nonce; this was checked in \ + check_nonce_mempool", + ); response::CheckTx::default() }