diff --git a/charts/sequencer/Chart.yaml b/charts/sequencer/Chart.yaml index 6af0ddafc1..a76eec5f5b 100644 --- a/charts/sequencer/Chart.yaml +++ b/charts/sequencer/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.23.1 +version: 0.23.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. diff --git a/charts/sequencer/templates/configmaps.yaml b/charts/sequencer/templates/configmaps.yaml index 2f49d4416c..b780ce003c 100644 --- a/charts/sequencer/templates/configmaps.yaml +++ b/charts/sequencer/templates/configmaps.yaml @@ -75,5 +75,6 @@ data: OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}" {{- if not .Values.global.dev }} {{- else }} + ASTRIA_SEQUENCER_MEMPOOL_PARKED_MAX_TX_COUNT: "{{ .Values.sequencer.mempool.parked.maxTxCount }}" {{- end }} --- diff --git a/charts/sequencer/values.yaml b/charts/sequencer/values.yaml index 3b7247dab1..85faf008db 100644 --- a/charts/sequencer/values.yaml +++ b/charts/sequencer/values.yaml @@ -71,6 +71,9 @@ genesis: # pubKey: lV57+rGs2vac7mvkGHP1oBFGHPJM3a+WoAzeFDCJDNU= sequencer: + mempool: + parked: + maxTxCount: 200 metrics: enabled: false otel: diff --git a/crates/astria-core/src/protocol/abci.rs b/crates/astria-core/src/protocol/abci.rs index 7cacc6b780..4047ae679f 100644 --- a/crates/astria-core/src/protocol/abci.rs +++ b/crates/astria-core/src/protocol/abci.rs @@ -25,6 +25,7 @@ impl AbciErrorCode { pub const ALREADY_PRESENT: Self = Self(unsafe { NonZeroU32::new_unchecked(14) }); pub const NONCE_TAKEN: Self = Self(unsafe { NonZeroU32::new_unchecked(15) }); pub const ACCOUNT_SIZE_LIMIT: Self = Self(unsafe { NonZeroU32::new_unchecked(16) }); + pub const PARKED_FULL: Self = Self(unsafe { NonZeroU32::new_unchecked(17) }); } impl AbciErrorCode { @@ -62,6 +63,7 @@ impl AbciErrorCode { Self::ACCOUNT_SIZE_LIMIT => { "the account has reached the maximum number of parked transactions".into() } + Self::PARKED_FULL => "the mempool is out of space for more parked transactions".into(), Self(other) => { format!("invalid error code {other}: should be unreachable (this is a bug)") } diff --git a/crates/astria-sequencer/local.env.example b/crates/astria-sequencer/local.env.example index 05cb7937a0..eaee8c607d 100644 --- a/crates/astria-sequencer/local.env.example +++ b/crates/astria-sequencer/local.env.example @@ -10,6 +10,9 @@ ASTRIA_SEQUENCER_DB_FILEPATH="/tmp/astria_db" # Only used if the "mint" feature is enabled ASTRIA_SEQUENCER_ENABLE_MINT=false +# Set size of mempool's parked container +ASTRIA_SEQUENCER_MEMPOOL_PARKED_MAX_TX_COUNT=200 + # Socket address for gRPC server ASTRIA_SEQUENCER_GRPC_ADDR="127.0.0.1:8080" # Log level for the sequencer diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index ee97bb92c7..80abf11219 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -219,7 +219,7 @@ pub(crate) async fn initialize_app_with_storage( .expect("failed to create temp storage backing chain state"); let snapshot = storage.latest_snapshot(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); let genesis_state = genesis_state.unwrap_or_else(self::genesis_state); diff --git a/crates/astria-sequencer/src/config.rs b/crates/astria-sequencer/src/config.rs index 74955647c8..9b1359c678 100644 --- a/crates/astria-sequencer/src/config.rs +++ b/crates/astria-sequencer/src/config.rs @@ -33,6 +33,8 @@ pub struct Config { pub metrics_http_listener_addr: String, /// Writes a human readable format to stdout instead of JSON formatted OTEL trace data. pub pretty_print: bool, + /// The maximum number of transactions that can be parked in the mempool. + pub mempool_parked_max_tx_count: usize, } impl config::Config for Config { diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index d75d00df13..f6248bda41 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -258,7 +258,7 @@ mod tests { let block = make_test_sequencer_block(1); let storage = cnidarium::TempStorage::new().await.unwrap(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let mut state_tx = StateDelta::new(storage.latest_snapshot()); state_tx.put_block_height(1).unwrap(); state_tx.put_sequencer_block(block).unwrap(); @@ -277,7 +277,7 @@ mod tests { async fn get_pending_nonce_in_mempool() { let storage = cnidarium::TempStorage::new().await.unwrap(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); @@ -328,7 +328,7 @@ mod tests { let storage = cnidarium::TempStorage::new().await.unwrap(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let mut state_tx = StateDelta::new(storage.latest_snapshot()); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs index fb710a3148..5fd452f4c9 100644 --- a/crates/astria-sequencer/src/mempool/benchmarks.rs +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -105,7 +105,7 @@ fn init_mempool() -> Mempool { .build() .unwrap(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, T::size()); let account_mock_balance = mock_balances(0, 0); let tx_mock_cost = mock_tx_cost(0, 0, 0); runtime.block_on(async { diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index 6573bbfbd5..900c03420f 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -35,6 +35,7 @@ use transactions_container::{ ParkedTransactions, PendingTransactions, TimemarkedTransaction, + TransactionsContainer as _, }; use crate::{ @@ -170,10 +171,13 @@ pub(crate) struct Mempool { impl Mempool { #[must_use] - pub(crate) fn new(metrics: &'static Metrics) -> Self { + pub(crate) fn new(metrics: &'static Metrics, parked_max_tx_count: usize) -> Self { Self { pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))), - parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))), + parked: Arc::new(RwLock::new(ParkedTransactions::new( + TX_TTL, + parked_max_tx_count, + ))), comet_bft_removal_cache: Arc::new(RwLock::new(RemovalCache::new( NonZeroUsize::try_from(REMOVAL_CACHE_SIZE) .expect("Removal cache cannot be zero sized"), @@ -227,6 +231,10 @@ impl Mempool { ¤t_account_balances, ) { Ok(()) => { + // log current size of parked + self.metrics + .set_transactions_in_mempool_parked(parked.len()); + // track in contained txs self.lock_contained_txs().await.add(id); Ok(()) @@ -238,7 +246,8 @@ impl Mempool { InsertionError::AlreadyPresent | InsertionError::NonceTooLow | InsertionError::NonceTaken - | InsertionError::AccountSizeLimit, + | InsertionError::AccountSizeLimit + | InsertionError::ParkedSizeLimit, ) => error, Ok(()) => { // check parked for txs able to be promoted @@ -441,8 +450,8 @@ impl Mempool { let tx_id = tx.id(); if let Err(error) = parked.add(tx, current_nonce, ¤t_balances) { // NOTE: this shouldn't happen normally but could on the edge case of - // the parked queue being full for the account. This also means - // grabbing the lock inside the loop is more performant. + // the parked queue being full for the account or globally. + // Grabbing the lock inside the loop should be more performant. self.lock_contained_txs().await.remove(tx_id); self.metrics.increment_internal_logic_error(); error!( @@ -514,7 +523,7 @@ mod tests { #[tokio::test] async fn insert() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -550,7 +559,7 @@ mod tests { tx1_replacement.clone(), 0, account_balances.clone(), - tx_cost.clone() + tx_cost.clone(), ) .await .unwrap_err(), @@ -578,7 +587,7 @@ mod tests { // odder edge cases that can be hit if a node goes offline or fails to see // some transactions that other nodes include into their proposed blocks. let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -680,7 +689,7 @@ mod tests { #[tokio::test] async fn run_maintenance_promotion() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); // create transaction setup to trigger promotions // @@ -753,7 +762,7 @@ mod tests { #[tokio::test] async fn run_maintenance_demotion() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); // create transaction setup to trigger demotions // @@ -848,7 +857,7 @@ mod tests { #[tokio::test] async fn remove_invalid() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 10); @@ -951,7 +960,7 @@ mod tests { #[tokio::test] async fn should_get_pending_nonce() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -985,7 +994,7 @@ mod tests { tx100.clone(), 100, account_balances.clone(), - tx_cost.clone() + tx_cost.clone(), ) .await .is_ok(), @@ -1001,7 +1010,7 @@ mod tests { tx101.clone(), 100, account_balances.clone(), - tx_cost.clone() + tx_cost.clone(), ) .await .is_ok(), @@ -1094,7 +1103,7 @@ mod tests { #[tokio::test] async fn tx_tracked_invalid_removal_removes_all() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -1128,7 +1137,7 @@ mod tests { #[tokio::test] async fn tx_tracked_maintenance_removes_all() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -1161,7 +1170,7 @@ mod tests { #[tokio::test] async fn tx_tracked_reinsertion_ok() { let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -1200,4 +1209,30 @@ mod tests { assert!(mempool.is_tracked(tx0.id().get()).await); assert!(mempool.is_tracked(tx1.id().get()).await); } + + #[tokio::test] + async fn parked_limit_enforced() { + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics, 1); + let account_balances = mock_balances(100, 100); + let tx_cost = mock_tx_cost(10, 10, 0); + + let tx0 = MockTxBuilder::new().nonce(1).build(); + let tx1 = MockTxBuilder::new().nonce(2).build(); + + mempool + .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap(); + + // size limit fails as expected + assert_eq!( + mempool + .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) + .await + .unwrap_err(), + InsertionError::ParkedSizeLimit, + "size limit should be enforced" + ); + } } diff --git a/crates/astria-sequencer/src/mempool/transactions_container.rs b/crates/astria-sequencer/src/mempool/transactions_container.rs index 10d120df15..c055fda88c 100644 --- a/crates/astria-sequencer/src/mempool/transactions_container.rs +++ b/crates/astria-sequencer/src/mempool/transactions_container.rs @@ -31,10 +31,6 @@ use crate::{ transaction, }; -pub(super) type PendingTransactions = TransactionsContainer; -pub(super) type ParkedTransactions = - TransactionsContainer>; - /// `TimemarkedTransaction` is a wrapper around a signed transaction used to keep track of when that /// transaction was first seen in the mempool. #[derive(Clone, Debug)] @@ -172,6 +168,7 @@ pub(crate) enum InsertionError { NonceGap, AccountSizeLimit, AccountBalanceTooLow, + ParkedSizeLimit, } impl fmt::Display for InsertionError { @@ -192,6 +189,9 @@ impl fmt::Display for InsertionError { InsertionError::AccountBalanceTooLow => { write!(f, "account does not have enough balance to cover costs") } + InsertionError::ParkedSizeLimit => { + write!(f, "parked container size limit reached") + } } } } @@ -487,36 +487,92 @@ pub(super) trait TransactionsForAccount: Default { } } -/// `TransactionsContainer` is a container used for managing transactions for multiple accounts. +/// A container used for managing pending transactions for multiple accounts. #[derive(Clone, Debug)] -pub(super) struct TransactionsContainer { - /// A map of collections of transactions, indexed by the account address. - txs: HashMap<[u8; 20], T>, +pub(super) struct PendingTransactions { + txs: HashMap<[u8; 20], PendingTransactionsForAccount>, tx_ttl: Duration, } -impl TransactionsContainer { - pub(super) fn new(tx_ttl: Duration) -> Self { - TransactionsContainer:: { - txs: HashMap::new(), - tx_ttl, +/// A container used for managing parked transactions for multiple accounts. +#[derive(Clone, Debug)] +pub(super) struct ParkedTransactions { + txs: HashMap<[u8; 20], ParkedTransactionsForAccount>, + tx_ttl: Duration, + max_tx_count: usize, +} + +impl TransactionsContainer for PendingTransactions { + fn txs(&self) -> &HashMap<[u8; 20], PendingTransactionsForAccount> { + &self.txs + } + + fn txs_mut(&mut self) -> &mut HashMap<[u8; 20], PendingTransactionsForAccount> { + &mut self.txs + } + + fn tx_ttl(&self) -> Duration { + self.tx_ttl + } + + fn check_total_tx_count(&self) -> Result<(), InsertionError> { + Ok(()) + } +} + +impl + TransactionsContainer> + for ParkedTransactions +{ + fn txs(&self) -> &HashMap<[u8; 20], ParkedTransactionsForAccount> { + &self.txs + } + + fn txs_mut( + &mut self, + ) -> &mut HashMap<[u8; 20], ParkedTransactionsForAccount> { + &mut self.txs + } + + fn tx_ttl(&self) -> Duration { + self.tx_ttl + } + + fn check_total_tx_count(&self) -> Result<(), InsertionError> { + if self.len() >= self.max_tx_count { + return Err(InsertionError::ParkedSizeLimit); } + Ok(()) } +} + +/// `TransactionsContainer` is a container used for managing transactions for multiple accounts. +pub(super) trait TransactionsContainer { + fn txs(&self) -> &HashMap<[u8; 20], T>; + + fn txs_mut(&mut self) -> &mut HashMap<[u8; 20], T>; + + fn tx_ttl(&self) -> Duration; + + fn check_total_tx_count(&self) -> Result<(), InsertionError>; /// Returns all of the currently tracked addresses. - pub(super) fn addresses(&self) -> impl Iterator { - self.txs.keys() + fn addresses<'a>(&'a self) -> impl Iterator + where + T: 'a, + { + self.txs().keys() } /// Recosts transactions for an account. /// /// Logs an error if fails to recost a transaction. - pub(super) async fn recost_transactions( + async fn recost_transactions( &mut self, address: &[u8; 20], state: &S, ) { - let Some(account) = self.txs.get_mut(address) else { + let Some(account) = self.txs_mut().get_mut(address) else { return; }; @@ -543,13 +599,15 @@ impl TransactionsContainer { /// `current_account_nonce` should be the current nonce of the account associated with the /// transaction. If this ever decreases, the `TransactionsContainer` containers could become /// invalid. - pub(super) fn add( + fn add( &mut self, ttx: TimemarkedTransaction, current_account_nonce: u32, current_account_balances: &HashMap, ) -> Result<(), InsertionError> { - match self.txs.entry(*ttx.address()) { + self.check_total_tx_count()?; + + match self.txs_mut().entry(*ttx.address()) { hash_map::Entry::Occupied(entry) => { entry .into_mut() @@ -569,14 +627,14 @@ impl TransactionsContainer { /// /// If `signed_tx` existed, returns `Ok` with the hashes of the removed transactions. If /// `signed_tx` was not in the collection, it is returned via `Err`. - pub(super) fn remove( + fn remove( &mut self, signed_tx: Arc, ) -> Result, Arc> { let address = signed_tx.verification_key().address_bytes(); // Take the collection for this account out of `self` temporarily. - let Some(mut account_txs) = self.txs.remove(address) else { + let Some(mut account_txs) = self.txs_mut().remove(address) else { return Err(signed_tx); }; @@ -584,7 +642,7 @@ impl TransactionsContainer { // Re-add the collection to `self` if it's not empty. if !account_txs.txs().is_empty() { - let _ = self.txs.insert(*address, account_txs); + let _ = self.txs_mut().insert(*address, account_txs); } if removed.is_empty() { @@ -596,21 +654,21 @@ impl TransactionsContainer { /// Removes all of the transactions for the given account and returns the hashes of the removed /// transactions. - pub(super) fn clear_account(&mut self, address: &[u8; 20]) -> Vec<[u8; 32]> { - self.txs + fn clear_account(&mut self, address: &[u8; 20]) -> Vec<[u8; 32]> { + self.txs_mut() .remove(address) .map(|account_txs| account_txs.txs().values().map(|ttx| ttx.tx_hash).collect()) .unwrap_or_default() } /// Cleans the specified account of stale and expired transactions. - pub(super) fn clean_account_stale_expired( + fn clean_account_stale_expired( &mut self, address: &[u8; 20], current_account_nonce: u32, ) -> Vec<([u8; 32], RemovalReason)> { // Take the collection for this account out of `self` temporarily if it exists. - let Some(mut account_txs) = self.txs.remove(address) else { + let Some(mut account_txs) = self.txs_mut().remove(address) else { return Vec::new(); }; @@ -624,7 +682,7 @@ impl TransactionsContainer { // check for expired transactions if let Some(first_tx) = account_txs.txs_mut().first_entry() { - if first_tx.get().is_expired(Instant::now(), self.tx_ttl) { + if first_tx.get().is_expired(Instant::now(), self.tx_ttl()) { removed_txs.push((first_tx.get().tx_hash, RemovalReason::Expired)); removed_txs.extend( account_txs @@ -639,15 +697,15 @@ impl TransactionsContainer { // Re-add the collection to `self` if it's not empty. if !account_txs.txs().is_empty() { - let _ = self.txs.insert(*address, account_txs); + let _ = self.txs_mut().insert(*address, account_txs); } removed_txs } /// Returns the number of transactions in the container. - pub(super) fn len(&self) -> usize { - self.txs + fn len(&self) -> usize { + self.txs() .values() .map(|account_txs| account_txs.txs().len()) .sum() @@ -655,13 +713,20 @@ impl TransactionsContainer { #[cfg(test)] fn contains_tx(&self, tx_hash: &[u8; 32]) -> bool { - self.txs + self.txs() .values() .any(|account_txs| account_txs.contains_tx(tx_hash)) } } -impl TransactionsContainer { +impl PendingTransactions { + pub(super) fn new(tx_ttl: Duration) -> Self { + PendingTransactions { + txs: HashMap::new(), + tx_ttl, + } + } + /// Remove and return transactions that should be moved from pending to parked /// based on the specified account's current balances. pub(super) fn find_demotables( @@ -755,7 +820,15 @@ impl TransactionsContainer { } } -impl TransactionsContainer> { +impl ParkedTransactions { + pub(super) fn new(tx_ttl: Duration, max_tx_count: usize) -> Self { + ParkedTransactions { + txs: HashMap::new(), + tx_ttl, + max_tx_count, + } + } + /// Removes and returns the transactions that can be promoted from parked to pending for /// an account. Will only return sequential nonces from `target_nonce` whose costs are /// covered by the `available_balance`. @@ -1149,7 +1222,7 @@ mod tests { .add( ttx_0_too_expensive_0, current_account_nonce, - &account_balances + &account_balances, ) .unwrap_err(), InsertionError::AccountBalanceTooLow @@ -1162,7 +1235,7 @@ mod tests { .add( ttx_0_too_expensive_1, current_account_nonce, - &account_balances + &account_balances, ) .unwrap_err(), InsertionError::AccountBalanceTooLow @@ -1247,7 +1320,7 @@ mod tests { // remove from start will remove all assert_eq!( account_txs.remove(0), - vec![ttx_0.tx_hash, ttx_1.tx_hash, ttx_2.tx_hash,], + vec![ttx_0.tx_hash, ttx_1.tx_hash, ttx_2.tx_hash], "three transactions should've been removed" ); assert!(account_txs.txs().is_empty()); @@ -1846,7 +1919,7 @@ mod tests { #[tokio::test] async fn parked_transactions_find_promotables() { - let mut parked_txs = ParkedTransactions::::new(TX_TTL); + let mut parked_txs = ParkedTransactions::::new(TX_TTL, 100); // transactions to add to accounts let ttx_1 = MockTTXBuilder::new() @@ -2045,4 +2118,33 @@ mod tests { &90 ); } + + #[tokio::test] + async fn parked_transactions_size_limit_works() { + let mut parked_txs = ParkedTransactions::::new(TX_TTL, 1); + + // transactions to add to account + let ttx_1 = MockTTXBuilder::new().nonce(1).build(); + let ttx_2 = MockTTXBuilder::new().nonce(2).build(); + let account_balances_full = mock_balances(100, 100); + + // under limit okay + parked_txs + .add(ttx_1.clone(), 1, &account_balances_full) + .unwrap(); + + // growing past limit causes error + assert_eq!( + parked_txs + .add(ttx_2.clone(), 0, &account_balances_full) + .unwrap_err(), + InsertionError::ParkedSizeLimit, + "size limit should be enforced" + ); + + // removing transactions makes space for new ones + parked_txs.remove(ttx_1.signed_tx).unwrap(); + // adding should now be okay + parked_txs.add(ttx_2, 0, &account_balances_full).unwrap(); + } } diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index dbb3858ef7..a59f4d33bf 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -37,6 +37,7 @@ pub struct Metrics { actions_per_transaction_in_mempool: Histogram, transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, + transactions_in_mempool_parked: Gauge, mempool_recosted: Counter, internal_logic_error: Counter, } @@ -152,6 +153,10 @@ impl Metrics { self.transactions_in_mempool_total.set(count); } + pub(crate) fn set_transactions_in_mempool_parked(&self, count: usize) { + self.transactions_in_mempool_parked.set(count); + } + pub(crate) fn increment_mempool_recosted(&self) { self.mempool_recosted.increment(1); } @@ -328,6 +333,13 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let transactions_in_mempool_parked = builder + .new_gauge_factory( + TRANSACTIONS_IN_MEMPOOL_PARKED, + "The number of transactions parked in the app mempool", + )? + .register()?; + let mempool_recosted = builder .new_counter_factory( MEMPOOL_RECOSTED, @@ -368,6 +380,7 @@ impl telemetry::Metrics for Metrics { actions_per_transaction_in_mempool, transaction_in_mempool_size_bytes, transactions_in_mempool_total, + transactions_in_mempool_parked, mempool_recosted, internal_logic_error, }) @@ -396,6 +409,7 @@ metric_names!(const METRICS_NAMES: ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, + TRANSACTIONS_IN_MEMPOOL_PARKED, MEMPOOL_RECOSTED, INTERNAL_LOGIC_ERROR ); @@ -419,6 +433,7 @@ mod tests { PROCESS_PROPOSAL_SKIPPED_PROPOSAL, PROPOSAL_DEPOSITS, PROPOSAL_TRANSACTIONS, + TRANSACTIONS_IN_MEMPOOL_PARKED, TRANSACTIONS_IN_MEMPOOL_TOTAL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, }; @@ -482,6 +497,10 @@ mod tests { TRANSACTIONS_IN_MEMPOOL_TOTAL, "transactions_in_mempool_total", ); + assert_const( + TRANSACTIONS_IN_MEMPOOL_PARKED, + "transactions_in_mempool_parked", + ); assert_const(MEMPOOL_RECOSTED, "mempool_recosted"); assert_const(INTERNAL_LOGIC_ERROR, "internal_logic_error"); } diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index a6df51421b..575734de60 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -84,7 +84,7 @@ impl Sequencer { .wrap_err("failed to load storage backing chain state")?; let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, config.mempool_parked_max_tx_count); let app = App::new(snapshot, mempool.clone(), metrics) .await .wrap_err("failed to initialize app")?; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 4e1868ceff..08a9523aef 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -471,7 +471,7 @@ mod tests { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mempool = Mempool::new(metrics); + let mempool = Mempool::new(metrics, 100); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index 425d199798..dcaff32aaf 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -126,6 +126,12 @@ impl IntoCheckTxResponse for InsertionError { log: InsertionError::AccountSizeLimit.to_string(), ..response::CheckTx::default() }, + InsertionError::ParkedSizeLimit => response::CheckTx { + code: Code::Err(AbciErrorCode::PARKED_FULL.value()), + info: AbciErrorCode::PARKED_FULL.info(), + log: "transaction failed insertion because parked container is full".into(), + ..response::CheckTx::default() + }, InsertionError::AccountBalanceTooLow | InsertionError::NonceGap => { // NOTE: these are handled interally by the mempool and don't // block transaction inclusion in the mempool. they shouldn't diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index b6511e9c67..b43f7865b5 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -28,7 +28,7 @@ async fn future_nonces_are_accepted() { let snapshot = storage.latest_snapshot(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut mempool = Mempool::new(metrics); + let mut mempool = Mempool::new(metrics, 100); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -58,7 +58,7 @@ async fn rechecks_pass() { let snapshot = storage.latest_snapshot(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut mempool = Mempool::new(metrics); + let mut mempool = Mempool::new(metrics, 100); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -96,7 +96,7 @@ async fn can_reinsert_after_recheck_fail() { let snapshot = storage.latest_snapshot(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut mempool = Mempool::new(metrics); + let mut mempool = Mempool::new(metrics, 100); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -144,7 +144,7 @@ async fn recheck_adds_non_tracked_tx() { let snapshot = storage.latest_snapshot(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut mempool = Mempool::new(metrics); + let mut mempool = Mempool::new(metrics, 100); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state();