diff --git a/crates/astria-core/src/protocol/abci.rs b/crates/astria-core/src/protocol/abci.rs index 76c7bc8c8a..22339e4dec 100644 --- a/crates/astria-core/src/protocol/abci.rs +++ b/crates/astria-core/src/protocol/abci.rs @@ -18,6 +18,8 @@ impl AbciErrorCode { pub const INSUFFICIENT_FUNDS: Self = Self(6); pub const INVALID_CHAIN_ID: Self = Self(7); pub const VALUE_NOT_FOUND: Self = Self(8); + pub const TRANSACTION_EXPIRED: Self = Self(9); + pub const TRANSACTION_FAILED: Self = Self(10); } impl AbciErrorCode { @@ -33,6 +35,8 @@ impl AbciErrorCode { 6 => "insufficient funds".into(), 7 => "the provided chain id was invalid".into(), 8 => "the requested value was not found".into(), + 9 => "the transaction expired in the app's mempool".into(), + 10 => "the transaction failed to execute in prepare_proposal()".into(), other => format!("unknown non-zero abci error code: {other}").into(), } } @@ -61,6 +65,8 @@ impl From for AbciErrorCode { 6 => Self::INSUFFICIENT_FUNDS, 7 => Self::INVALID_CHAIN_ID, 8 => Self::VALUE_NOT_FOUND, + 9 => Self::TRANSACTION_EXPIRED, + 10 => Self::TRANSACTION_FAILED, other => Self(other), } } diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index bd282d7fb0..61f7527ac7 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -67,6 +67,7 @@ config = { package = "astria-config", path = "../astria-config", features = [ "tests", ] } insta = { workspace = true, features = ["json"] } +tokio = { workspace = true, features = ["test-util"] } [build-dependencies] astria-build-info = { path = "../astria-build-info", features = ["build"] } diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index e8aef971e7..835ed045ea 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -88,7 +88,10 @@ use crate::{ component::Component as _, genesis::GenesisState, ibc::component::IbcComponent, - mempool::Mempool, + mempool::{ + Mempool, + RemovalReason, + }, metrics::Metrics, proposal::{ block_size_constraints::BlockSizeConstraints, @@ -547,12 +550,20 @@ impl App { ); failed_tx_count = failed_tx_count.saturating_add(1); - // we re-insert the tx into the mempool if it failed to execute - // due to an invalid nonce, as it may be valid in the future. - // if it's invalid due to the nonce being too low, it'll be - // removed from the mempool in `update_mempool_after_finalization`. if e.downcast_ref::().is_some() { + // we re-insert the tx into the mempool if it failed to execute + // due to an invalid nonce, as it may be valid in the future. + // if it's invalid due to the nonce being too low, it'll be + // removed from the mempool in `update_mempool_after_finalization`. txs_to_readd_to_mempool.push((enqueued_tx, priority)); + } else { + // the transaction should be removed from the cometbft mempool + self.mempool + .track_removal_comet_bft( + enqueued_tx.tx_hash(), + RemovalReason::FailedPrepareProposal(e.to_string()), + ) + .await; } } } @@ -1119,9 +1130,7 @@ async fn update_mempool_after_finalization( state: S, ) -> anyhow::Result<()> { let current_account_nonce_getter = |address: Address| state.get_account_nonce(address); - mempool - .update_priorities(current_account_nonce_getter) - .await + mempool.run_maintenance(current_account_nonce_getter).await } /// relevant data of a block being executed. diff --git a/crates/astria-sequencer/src/mempool.rs b/crates/astria-sequencer/src/mempool.rs index 1da336b3c1..490a4739df 100644 --- a/crates/astria-sequencer/src/mempool.rs +++ b/crates/astria-sequencer/src/mempool.rs @@ -3,8 +3,12 @@ use std::{ self, Ordering, }, - collections::HashMap, + collections::{ + HashMap, + VecDeque, + }, future::Future, + num::NonZeroUsize, sync::{ Arc, OnceLock, @@ -22,7 +26,13 @@ use astria_core::{ }, }; use priority_queue::PriorityQueue; -use tokio::sync::RwLock; +use tokio::{ + sync::RwLock, + time::{ + Duration, + Instant, + }, +}; use tracing::debug; type MempoolQueue = PriorityQueue; @@ -34,6 +44,7 @@ type MempoolQueue = PriorityQueue; #[derive(Clone, Debug)] pub(crate) struct TransactionPriority { nonce_diff: u32, + time_first_seen: Instant, } impl PartialEq for TransactionPriority { @@ -75,7 +86,11 @@ impl EnqueuedTransaction { } } - fn priority(&self, current_account_nonce: u32) -> anyhow::Result { + fn priority( + &self, + current_account_nonce: u32, + time_first_seen: Option, + ) -> anyhow::Result { let Some(nonce_diff) = self.signed_tx.nonce().checked_sub(current_account_nonce) else { return Err(anyhow::anyhow!( "transaction nonce {} is less than current account nonce {current_account_nonce}", @@ -85,6 +100,7 @@ impl EnqueuedTransaction { Ok(TransactionPriority { nonce_diff, + time_first_seen: time_first_seen.unwrap_or(Instant::now()), }) } @@ -117,6 +133,62 @@ impl std::hash::Hash for EnqueuedTransaction { } } +#[derive(Debug, Clone)] +pub(crate) enum RemovalReason { + Expired, + FailedPrepareProposal(String), +} + +const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes +const REMOVAL_CACHE_SIZE: usize = 4096; + +/// `RemovalCache` is used to signal to `CometBFT` that a +/// transaction can be removed from the `CometBFT` mempool. +/// +/// This is useful for when a transaction fails execution or when a transaction +/// has expired in the app's mempool. +#[derive(Clone)] +pub(crate) struct RemovalCache { + cache: HashMap<[u8; 32], RemovalReason>, + remove_queue: VecDeque<[u8; 32]>, + max_size: NonZeroUsize, +} + +impl RemovalCache { + fn new(max_size: NonZeroUsize) -> Self { + Self { + cache: HashMap::new(), + remove_queue: VecDeque::with_capacity(max_size.into()), + max_size, + } + } + + /// returns Some(RemovalReason) if transaction is cached and + /// removes the entry from the cache at the same time + fn remove(&mut self, tx_hash: [u8; 32]) -> Option { + self.cache.remove(&tx_hash) + } + + /// adds the transaction to the cache + fn add(&mut self, tx_hash: [u8; 32], reason: RemovalReason) { + if self.cache.contains_key(&tx_hash) { + return; + }; + + if self.remove_queue.len() == usize::from(self.max_size) { + // make space for the new transaction by removing the oldest transaction + let removed_tx = self + .remove_queue + .pop_front() + .expect("cache should contain elements"); + // remove transaction from cache if it is present + self.cache.remove(&removed_tx); + } + self.remove_queue.push_back(tx_hash); + self.cache.insert(tx_hash, reason); + } +} + /// [`Mempool`] is an internally-synchronized wrapper around a prioritized queue of transactions /// awaiting execution. /// @@ -129,51 +201,92 @@ impl std::hash::Hash for EnqueuedTransaction { /// - transaction expiration #[derive(Clone)] pub(crate) struct Mempool { - inner: Arc>, + queue: Arc>, + comet_bft_removal_cache: Arc>, + tx_ttl: Duration, } impl Mempool { #[must_use] pub(crate) fn new() -> Self { Self { - inner: Arc::new(RwLock::new(MempoolQueue::new())), + queue: Arc::new(RwLock::new(MempoolQueue::new())), + comet_bft_removal_cache: Arc::new(RwLock::new(RemovalCache::new( + NonZeroUsize::try_from(REMOVAL_CACHE_SIZE) + .expect("Removal cache cannot be zero sized"), + ))), + tx_ttl: TX_TTL, } } /// returns the number of transactions in the mempool #[must_use] pub(crate) async fn len(&self) -> usize { - self.inner.read().await.len() + self.queue.read().await.len() } /// inserts a transaction into the mempool /// - /// note: if the tx already exists in the mempool, it's overwritten with the new priority. + /// note: the oldest timestamp from found priorities is maintained. pub(crate) async fn insert( &self, tx: SignedTransaction, current_account_nonce: u32, ) -> anyhow::Result<()> { let enqueued_tx = EnqueuedTransaction::new(tx); - let priority = enqueued_tx.priority(current_account_nonce)?; - let tx_hash = enqueued_tx.tx_hash; - self.inner.write().await.push(enqueued_tx, priority); - tracing::trace!( - tx_hash = %telemetry::display::hex(&tx_hash), - "inserted transaction into mempool" - ); + let fresh_priority = enqueued_tx.priority(current_account_nonce, None)?; + Self::update_or_insert(&mut *self.queue.write().await, enqueued_tx, &fresh_priority); + Ok(()) } /// inserts all the given transactions into the mempool + /// + /// note: the oldest timestamp from found priorities for an `EnqueuedTransaction` is maintained. pub(crate) async fn insert_all(&self, txs: Vec<(EnqueuedTransaction, TransactionPriority)>) { - self.inner.write().await.extend(txs); + let mut queue = self.queue.write().await; + + for (enqueued_tx, priority) in txs { + Self::update_or_insert(&mut queue, enqueued_tx, &priority); + } + } + + /// inserts or updates the transaction in a timestamp preserving manner + /// + /// note: updates the priority using the `possible_priority`'s nonce diff. + fn update_or_insert( + queue: &mut PriorityQueue, + enqueued_tx: EnqueuedTransaction, + possible_priority: &TransactionPriority, + ) { + let oldest_timestamp = queue.get_priority(&enqueued_tx).map_or( + possible_priority.time_first_seen, + |prev_priority| { + possible_priority + .time_first_seen + .min(prev_priority.time_first_seen) + }, + ); + + let priority = TransactionPriority { + nonce_diff: possible_priority.nonce_diff, + time_first_seen: oldest_timestamp, + }; + + let tx_hash = enqueued_tx.tx_hash; + if queue.push(enqueued_tx, priority).is_none() { + // emit if didn't already exist + tracing::trace!( + tx_hash = %telemetry::display::hex(&tx_hash), + "inserted transaction into mempool" + ); + } } /// pops the transaction with the highest priority from the mempool #[must_use] pub(crate) async fn pop(&self) -> Option<(EnqueuedTransaction, TransactionPriority)> { - self.inner.write().await.pop() + self.queue.write().await.pop() } /// removes a transaction from the mempool @@ -184,7 +297,24 @@ impl Mempool { signed_tx, address, }; - self.inner.write().await.remove(&enqueued_tx); + self.queue.write().await.remove(&enqueued_tx); + } + + /// signal that the transaction should be removed from the `CometBFT` mempool + pub(crate) async fn track_removal_comet_bft(&self, tx_hash: [u8; 32], reason: RemovalReason) { + self.comet_bft_removal_cache + .write() + .await + .add(tx_hash, reason); + } + + /// checks if a transaction was flagged to be removed from the `CometBFT` mempool + /// and removes entry + pub(crate) async fn check_removed_comet_bft( + &mut self, + tx_hash: [u8; 32], + ) -> Option { + self.comet_bft_removal_cache.write().await.remove(tx_hash) } /// Updates the priority of the txs in the mempool based on the current state, and removes any @@ -192,7 +322,7 @@ impl Mempool { /// /// *NOTE*: this function locks the mempool until every tx has been checked. This could /// potentially stall consensus from moving to the next round if the mempool is large. - pub(crate) async fn update_priorities( + pub(crate) async fn run_maintenance( &self, current_account_nonce_getter: F, ) -> anyhow::Result<()> @@ -203,9 +333,19 @@ impl Mempool { let mut txs_to_remove = Vec::new(); let mut current_account_nonces = HashMap::new(); - let mut queue = self.inner.write().await; + let mut queue = self.queue.write().await; + let mut removal_cache = self.comet_bft_removal_cache.write().await; for (enqueued_tx, priority) in queue.iter_mut() { let address = enqueued_tx.address(); + + // check if the transactions has expired + if priority.time_first_seen.elapsed() > self.tx_ttl { + // tx has expired, set to remove and add to removal cache + txs_to_remove.push(enqueued_tx.clone()); + removal_cache.add(enqueued_tx.tx_hash, RemovalReason::Expired); + continue; + } + // Try to get the current account nonce from the ones already retrieved. let current_account_nonce = if let Some(nonce) = current_account_nonces.get(&address) { *nonce @@ -217,7 +357,7 @@ impl Mempool { current_account_nonces.insert(address, nonce); nonce }; - match enqueued_tx.priority(current_account_nonce) { + match enqueued_tx.priority(current_account_nonce, Some(priority.time_first_seen)) { Ok(new_priority) => *priority = new_priority, Err(e) => { debug!( @@ -240,7 +380,7 @@ impl Mempool { /// returns the pending nonce for the given address, /// if it exists in the mempool. pub(crate) async fn pending_nonce(&self, address: &Address) -> Option { - let inner = self.inner.read().await; + let inner = self.queue.read().await; let mut nonce = None; for (tx, _priority) in inner.iter() { if tx.address() == address { @@ -280,9 +420,12 @@ fn dummy_signed_tx() -> (Arc, Address) { #[cfg(test)] mod test { - use std::hash::{ - Hash, - Hasher, + use std::{ + hash::{ + Hash, + Hasher, + }, + time::Duration, }; use super::*; @@ -291,7 +434,7 @@ mod test { #[test] fn transaction_priority_should_error_if_invalid() { let enqueued_tx = EnqueuedTransaction::new(get_mock_tx(0)); - let priority = enqueued_tx.priority(1); + let priority = enqueued_tx.priority(1, None); assert!( priority .unwrap_err() @@ -307,9 +450,11 @@ mod test { fn transaction_priority_comparisons_should_be_consistent() { let high = TransactionPriority { nonce_diff: 0, + time_first_seen: Instant::now(), }; let low = TransactionPriority { nonce_diff: 1, + time_first_seen: Instant::now(), }; assert!(high.partial_cmp(&high) == Some(Ordering::Equal)); @@ -422,7 +567,7 @@ mod test { .map(|index| { let enqueued_tx = EnqueuedTransaction::new(get_mock_tx(u32::try_from(index).unwrap())); - let priority = enqueued_tx.priority(current_account_nonce).unwrap(); + let priority = enqueued_tx.priority(current_account_nonce, None).unwrap(); (enqueued_tx, priority) }) .collect(); @@ -497,7 +642,7 @@ mod test { // Update the priorities. Alice's first tx (with nonce 0) and other's first (with nonce // 100) should both get purged. mempool - .update_priorities(current_account_nonce_getter) + .run_maintenance(current_account_nonce_getter) .await .unwrap(); @@ -522,6 +667,73 @@ mod test { assert_eq!(priority.nonce_diff, 1); } + #[tokio::test(start_paused = true)] + async fn transaction_timestamp_not_overwritten_insert() { + let mempool = Mempool::new(); + + let insert_time = Instant::now(); + let tx = get_mock_tx(0); + mempool.insert(tx.clone(), 0).await.unwrap(); + + // pass time + tokio::time::advance(Duration::from_secs(60)).await; + assert_eq!( + insert_time.elapsed(), + Duration::from_secs(60), + "time should have advanced" + ); + + // re-insert + mempool.insert(tx, 0).await.unwrap(); + + // check that the timestamp was not overwritten in insert() + let (_, tx_priority) = mempool + .pop() + .await + .expect("transaction was added, should exist"); + assert_eq!( + tx_priority.time_first_seen.duration_since(insert_time), + Duration::from_secs(0), + "Tracked time should be the same" + ); + } + + #[tokio::test(start_paused = true)] + async fn transaction_timestamp_not_overwritten_insert_all() { + let mempool = Mempool::new(); + + let insert_time = Instant::now(); + let tx = get_mock_tx(0); + mempool.insert(tx.clone(), 0).await.unwrap(); + + // pass time + tokio::time::advance(Duration::from_secs(60)).await; + assert_eq!( + insert_time.elapsed(), + Duration::from_secs(60), + "time should have advanced" + ); + + // re-insert with new priority with higher timestamp + let enqueued_tx = EnqueuedTransaction::new(tx); + let new_priority = TransactionPriority { + nonce_diff: 0, + time_first_seen: Instant::now(), + }; + mempool.insert_all(vec![(enqueued_tx, new_priority)]).await; + + // check that the timestamp was not overwritten in insert() + let (_, tx_priority) = mempool + .pop() + .await + .expect("transaction was added, should exist"); + assert_eq!( + tx_priority.time_first_seen.duration_since(insert_time), + Duration::from_secs(0), + "Tracked time should be the same" + ); + } + #[tokio::test] async fn should_get_pending_nonce() { let mempool = Mempool::new(); @@ -565,6 +777,47 @@ mod test { ); } + #[tokio::test] + async fn tx_cache_size() { + let mut tx_cache = RemovalCache::new(NonZeroUsize::try_from(2).unwrap()); + + let tx_0 = [0u8; 32]; + let tx_1 = [1u8; 32]; + let tx_2 = [2u8; 32]; + + assert!( + tx_cache.remove(tx_0).is_none(), + "no transaction should be cached at first" + ); + + tx_cache.add(tx_0, RemovalReason::Expired); + assert!( + tx_cache.remove(tx_0).is_some(), + "transaction was added, should be cached" + ); + + assert!( + tx_cache.remove(tx_0).is_none(), + "transaction is cleared after reading" + ); + + tx_cache.add(tx_0, RemovalReason::Expired); + tx_cache.add(tx_1, RemovalReason::Expired); + tx_cache.add(tx_2, RemovalReason::Expired); + assert!( + tx_cache.remove(tx_1).is_some(), + "second transaction was added, should be cached" + ); + assert!( + tx_cache.remove(tx_2).is_some(), + "third transaction was added, should be cached" + ); + assert!( + tx_cache.remove(tx_0).is_none(), + "first transaction should not be cached" + ); + } + #[test] fn enqueued_transaction_can_be_instantiated() { // This just tests that the constructor does not fail. diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index b170b8dcd3..4999ead0ec 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -22,6 +22,8 @@ pub(crate) struct Metrics { proposal_transactions: Histogram, process_proposal_skipped_proposal: Counter, check_tx_removed_too_large: Counter, + check_tx_removed_expired: Counter, + check_tx_removed_failed_execution: Counter, check_tx_removed_failed_stateless: Counter, check_tx_removed_stale_nonce: Counter, check_tx_removed_account_balance: Counter, @@ -29,6 +31,7 @@ pub(crate) struct Metrics { impl Metrics { #[must_use] + #[allow(clippy::too_many_lines)] pub(crate) fn new() -> Self { describe_counter!( PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, @@ -126,6 +129,22 @@ impl Metrics { ); let check_tx_removed_account_balance = counter!(CHECK_TX_REMOVED_ACCOUNT_BALANCE); + describe_counter!( + CHECK_TX_REMOVED_FAILED_EXECUTION, + Unit::Count, + "The number of transactions that have been removed from the mempool due to failing \ + execution in prepare_proposal()" + ); + let check_tx_removed_failed_execution = counter!(CHECK_TX_REMOVED_FAILED_EXECUTION); + + describe_counter!( + CHECK_TX_REMOVED_EXPIRED, + Unit::Count, + "The number of transactions that have been removed from the mempool due to expiring \ + in the app's mempool" + ); + let check_tx_removed_expired = counter!(CHECK_TX_REMOVED_EXPIRED); + Self { prepare_proposal_excluded_transactions_decode_failure, prepare_proposal_excluded_transactions_cometbft_space, @@ -136,6 +155,8 @@ impl Metrics { proposal_transactions, process_proposal_skipped_proposal, check_tx_removed_too_large, + check_tx_removed_expired, + check_tx_removed_failed_execution, check_tx_removed_failed_stateless, check_tx_removed_stale_nonce, check_tx_removed_account_balance, @@ -186,6 +207,14 @@ impl Metrics { self.check_tx_removed_too_large.increment(1); } + pub(crate) fn increment_check_tx_removed_expired(&self) { + self.check_tx_removed_expired.increment(1); + } + + pub(crate) fn increment_check_tx_removed_failed_execution(&self) { + self.check_tx_removed_failed_execution.increment(1); + } + pub(crate) fn increment_check_tx_removed_failed_stateless(&self) { self.check_tx_removed_failed_stateless.increment(1); } @@ -209,15 +238,19 @@ metric_names!(pub const METRICS_NAMES: PROPOSAL_TRANSACTIONS, PROCESS_PROPOSAL_SKIPPED_PROPOSAL, CHECK_TX_REMOVED_TOO_LARGE, + CHECK_TX_REMOVED_EXPIRED, + CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, CHECK_TX_REMOVED_STALE_NONCE, - CHECK_TX_REMOVED_ACCOUNT_BALANCE + CHECK_TX_REMOVED_ACCOUNT_BALANCE, ); #[cfg(test)] mod tests { use super::{ CHECK_TX_REMOVED_ACCOUNT_BALANCE, + CHECK_TX_REMOVED_EXPIRED, + CHECK_TX_REMOVED_FAILED_EXECUTION, CHECK_TX_REMOVED_FAILED_STATELESS, CHECK_TX_REMOVED_STALE_NONCE, CHECK_TX_REMOVED_TOO_LARGE, @@ -268,6 +301,11 @@ mod tests { "process_proposal_skipped_proposal", ); assert_const(CHECK_TX_REMOVED_TOO_LARGE, "check_tx_removed_too_large"); + assert_const(CHECK_TX_REMOVED_EXPIRED, "check_tx_removed_expired"); + assert_const( + CHECK_TX_REMOVED_FAILED_EXECUTION, + "check_tx_removed_failed_execution", + ); assert_const( CHECK_TX_REMOVED_FAILED_STATELESS, "check_tx_removed_failed_stateless", diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index cdb7ce3999..a38389123f 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -31,7 +31,10 @@ use tracing::Instrument as _; use crate::{ accounts::state_ext::StateReadExt, - mempool::Mempool as AppMempool, + mempool::{ + Mempool as AppMempool, + RemovalReason, + }, metrics::Metrics, transaction, }; @@ -191,6 +194,31 @@ async fn handle_check_tx( }; }; + if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await { + mempool.remove(tx_hash).await; + + match removal_reason { + RemovalReason::Expired => { + metrics.increment_check_tx_removed_expired(); + return response::CheckTx { + code: AbciErrorCode::TRANSACTION_EXPIRED.into(), + info: "transaction expired in app's mempool".into(), + log: "Transaction expired in the app's mempool".into(), + ..response::CheckTx::default() + }; + } + RemovalReason::FailedPrepareProposal(err) => { + metrics.increment_check_tx_removed_failed_execution(); + return response::CheckTx { + code: AbciErrorCode::TRANSACTION_FAILED.into(), + info: "transaction failed execution in prepare_proposal()".into(), + log: format!("transaction failed execution because: {err}"), + ..response::CheckTx::default() + }; + } + } + }; + // tx is valid, push to mempool let current_account_nonce = state .get_account_nonce(crate::astria_address(