diff --git a/Cargo.lock b/Cargo.lock index f1cf124250..1fc2114a9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy", @@ -814,6 +815,7 @@ dependencies = [ "penumbra-tower-trace", "pin-project-lite", "prost", + "quick_cache", "rand 0.8.5", "rand_chacha 0.3.1", "regex", @@ -6173,6 +6175,18 @@ dependencies = [ "byteorder", ] +[[package]] +name = "quick_cache" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e00e03be638aaab399c951dba6bea0161f99f26df0ccab4ab0fc6eb9535bd48" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.5", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.36" diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index 60a35107f8..f5e6d68c54 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -34,6 +34,7 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", rev = "87ad ] } ibc-proto = { version = "0.41.0", features = ["server"] } matchit = "0.7.2" +quick_cache = "0.6.0" tower = "0.4" tower-abci = "0.12.0" tower-actor = "0.1.0" diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index d051c68cfc..c47d7dc467 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -34,6 +34,8 @@ pub struct Metrics { check_tx_duration_seconds_fetch_balances: Histogram, check_tx_duration_seconds_fetch_tx_cost: Histogram, check_tx_duration_seconds_insert_to_app_mempool: Histogram, + check_tx_cache_hit: Counter, + check_tx_cache_miss: Counter, actions_per_transaction_in_mempool: Histogram, transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, @@ -138,6 +140,14 @@ impl Metrics { .record(duration); } + pub(crate) fn increment_check_tx_cache_hit(&self) { + self.check_tx_cache_hit.increment(1); + } + + pub(crate) fn increment_check_tx_cache_miss(&self) { + self.check_tx_cache_miss.increment(1); + } + pub(crate) fn record_actions_per_transaction_in_mempool(&self, count: usize) { self.actions_per_transaction_in_mempool.record(count); } @@ -295,6 +305,21 @@ impl telemetry::Metrics for Metrics { let check_tx_duration_seconds_insert_to_app_mempool = check_tx_duration_factory .register_with_labels(&[(CHECK_TX_STAGE, "insert to app mempool".to_string())])?; + let check_tx_cache_hit = builder + .new_counter_factory( + CHECK_TX_CACHE_HIT, + "The number of `check_tx` attempts which have been retrieved from the cached \ + results", + )? + .register()?; + + let check_tx_cache_miss = builder + .new_counter_factory( + CHECK_TX_CACHE_MISS, + "The number of `check_tx` rechecks which have not been found in the cached results", + )? + .register()?; + let actions_per_transaction_in_mempool = builder .new_histogram_factory( ACTIONS_PER_TRANSACTION_IN_MEMPOOL, @@ -345,6 +370,8 @@ impl telemetry::Metrics for Metrics { check_tx_duration_seconds_fetch_balances, check_tx_duration_seconds_fetch_tx_cost, check_tx_duration_seconds_insert_to_app_mempool, + check_tx_cache_hit, + check_tx_cache_miss, actions_per_transaction_in_mempool, transaction_in_mempool_size_bytes, transactions_in_mempool_total, @@ -371,6 +398,8 @@ metric_names!(const METRICS_NAMES: CHECK_TX_DURATION_SECONDS_CONVERT_ADDRESS, CHECK_TX_DURATION_SECONDS_FETCH_BALANCES, CHECK_TX_DURATION_SECONDS_FETCH_TX_COST, + CHECK_TX_CACHE_HIT, + CHECK_TX_CACHE_MISS, ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, @@ -381,6 +410,8 @@ metric_names!(const METRICS_NAMES: mod tests { use super::{ ACTIONS_PER_TRANSACTION_IN_MEMPOOL, + CHECK_TX_CACHE_HIT, + CHECK_TX_CACHE_MISS, CHECK_TX_DURATION_SECONDS, CHECK_TX_REMOVED_ACCOUNT_BALANCE, CHECK_TX_REMOVED_EXPIRED, @@ -448,6 +479,8 @@ mod tests { "check_tx_removed_account_balance", ); assert_const(CHECK_TX_DURATION_SECONDS, "check_tx_duration_seconds"); + assert_const(CHECK_TX_CACHE_HIT, "check_tx_cache_hit"); + assert_const(CHECK_TX_CACHE_MISS, "check_tx_cache_miss"); assert_const( ACTIONS_PER_TRANSACTION_IN_MEMPOOL, "actions_per_transaction_in_mempool", diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index 9dc31e0f02..ddb16a4cd0 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -11,41 +11,50 @@ use std::{ use astria_core::{ generated::protocol::transactions::v1alpha1 as raw, - primitive::v1::asset::IbcPrefixed, + primitive::v1::{ + asset::IbcPrefixed, + Address, + }, protocol::{ abci::AbciErrorCode, transaction::v1alpha1::SignedTransaction, }, }; -use astria_eyre::eyre::WrapErr as _; -use cnidarium::Storage; +use bytes::Bytes; +use cnidarium::{ + StateRead, + Storage, +}; use futures::{ Future, FutureExt, }; use prost::Message as _; -use tendermint::{ - abci::Code, - v0_38::abci::{ - request, - response, - MempoolRequest, - MempoolResponse, - }, +use quick_cache::sync::Cache; +use sha2::Digest as _; +use tendermint::v0_38::abci::{ + request, + response, + MempoolRequest, + MempoolResponse, }; use tower::Service; use tower_abci::BoxError; use tracing::{ instrument, + warn, Instrument as _, }; use crate::{ accounts, - address, + address::{ + self, + StateReadExt as _, + }, app::ActionHandler as _, mempool::{ - get_account_balances, + self, Mempool as AppMempool, RemovalReason, }, @@ -54,15 +63,21 @@ use crate::{ }; const MAX_TX_SIZE: usize = 256_000; // 256 KB +/// The number of entries in the immutable checks cache. +const CACHE_SIZE: usize = 10_000; -/// Mempool handles [`request::CheckTx`] abci requests. -// -/// It performs a stateless check of the given transaction, -/// returning a [`tendermint::v0_38::abci::response::CheckTx`]. +type ImmutableChecksResult = Result, response::CheckTx>; + +/// `Mempool` handles [`request::CheckTx`] abci requests. +/// +/// It performs stateless and stateful checks of the given transaction, +/// returning a [`response::CheckTx`]. #[derive(Clone)] pub(crate) struct Mempool { storage: Storage, inner: AppMempool, + /// A cache of recent results of immutable checks, indexed by tx hash. + cached_immutable_checks: Arc>, metrics: &'static Metrics, } @@ -71,6 +86,7 @@ impl Mempool { Self { storage, inner: mempool, + cached_immutable_checks: Arc::new(Cache::new(CACHE_SIZE)), metrics, } } @@ -89,12 +105,20 @@ impl Service for Mempool { use penumbra_tower_trace::v038::RequestExt as _; let span = req.create_span(); let storage = self.storage.clone(); - let mut mempool = self.inner.clone(); + let mempool = self.inner.clone(); + let cached_immutable_checks = self.cached_immutable_checks.clone(); let metrics = self.metrics; async move { let rsp = match req { MempoolRequest::CheckTx(req) => MempoolResponse::CheckTx( - handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await, + handle_check_tx( + req, + storage.latest_snapshot(), + mempool, + cached_immutable_checks, + metrics, + ) + .await, ), }; Ok(rsp) @@ -110,268 +134,443 @@ impl Service for Mempool { /// as well as stateful checks (nonce and balance checks). /// /// If the tx passes all checks, status code 0 is returned. -#[allow(clippy::too_many_lines)] #[instrument(skip_all)] async fn handle_check_tx( - req: request::CheckTx, + request::CheckTx { + tx, + kind, + }: request::CheckTx, state: S, - mempool: &mut AppMempool, + mempool: AppMempool, + cached_immutable_checks: Arc>, metrics: &'static Metrics, ) -> response::CheckTx { - use sha2::Digest as _; - - let start_parsing = Instant::now(); - - let request::CheckTx { - tx, .. - } = req; + let start = Instant::now(); - let tx_hash = sha2::Sha256::digest(&tx).into(); + // So we don't waste time hashing a large object, we don't check the cache before the size + // check. let tx_len = tx.len(); - if tx_len > MAX_TX_SIZE { metrics.increment_check_tx_removed_too_large(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_TOO_LARGE.value()), - log: format!( - "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", - tx.len() + return FailedCheck::new( + AbciErrorCode::TRANSACTION_TOO_LARGE, + format!( + "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got: {tx_len} bytes", ), - info: AbciErrorCode::TRANSACTION_TOO_LARGE.info(), - ..response::CheckTx::default() - }; + ) + .into(); } - let raw_signed_tx = match raw::SignedTransaction::decode(tx) { + // Ok to hash the tx now and check in the cache. + let tx_hash = sha2::Sha256::digest(&tx).into(); + let signed_tx = match cached_immutable_checks + .get_value_or_guard_async(&tx_hash) + .await + { + Ok(Ok(cached_tx)) => { + // The previous `parse_and_run_immutable_checks` call was `Ok`: rerun mutable checks. + metrics.increment_check_tx_cache_hit(); + cached_tx + } + Ok(Err(cached_error_response)) => { + // The previous `parse_and_run_immutable_checks` call was `Err`: just return it. + metrics.increment_check_tx_cache_hit(); + return cached_error_response; + } + Err(guard) => { + if kind == request::CheckTxKind::Recheck { + warn!( + tx_hash = %telemetry::display::base64(&tx_hash), + "got a cache miss for recheck of tx" + ); + metrics.increment_check_tx_cache_miss(); + } + let immutable_checks_result = + parse_tx_and_run_immutable_checks(tx, start, &state, metrics).await; + + if guard.insert(immutable_checks_result.clone()).is_err() { + warn!( + tx_hash = %telemetry::display::base64(&tx_hash), + "failed to cache the check tx result" + ); + } + + match immutable_checks_result { + Ok(tx) => tx, + Err(response) => return response, + } + } + }; + + run_mutable_checks(signed_tx, tx_hash, tx_len, state, mempool, metrics) + .await + .unwrap_or_else(response::CheckTx::from) +} + +/// Parses and returns the signed tx from the request if and only if it passes immutable checks, +/// i.e. checks which will always pass or always fail. +async fn parse_tx_and_run_immutable_checks( + serialized_tx: Bytes, + mut start: Instant, + state: &S, + metrics: &'static Metrics, +) -> ImmutableChecksResult { + let raw_signed_tx = match raw::SignedTransaction::decode(serialized_tx) { Ok(tx) => tx, Err(e) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - log: e.to_string(), - info: "failed decoding bytes as a protobuf SignedTransaction".into(), - ..response::CheckTx::default() - }; + return Err(FailedCheck::new( + AbciErrorCode::INVALID_PARAMETER, + format!("failed decoding bytes as a protobuf SignedTransaction: {e}"), + ) + .into()); } }; let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { Ok(tx) => tx, Err(e) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ - the signature was invalid" - .into(), - log: e.to_string(), - ..response::CheckTx::default() - }; + return Err(FailedCheck::new( + AbciErrorCode::INVALID_PARAMETER, + format!( + "the provided bytes were not a valid protobuf-encoded SignedTransaction, or \ + the signature was invalid: {e:#}" + ), + ) + .into()); } }; - let finished_parsing = Instant::now(); - metrics.record_check_tx_duration_seconds_parse_tx( - finished_parsing.saturating_duration_since(start_parsing), - ); + let mut end = Instant::now(); + metrics.record_check_tx_duration_seconds_parse_tx(end.saturating_duration_since(start)); + start = end; if let Err(e) = signed_tx.check_stateless().await { metrics.increment_check_tx_removed_failed_stateless(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_PARAMETER.value()), - info: "transaction failed stateless check".into(), - log: e.to_string(), - ..response::CheckTx::default() - }; - }; - - let finished_check_stateless = Instant::now(); - metrics.record_check_tx_duration_seconds_check_stateless( - finished_check_stateless.saturating_duration_since(finished_parsing), - ); - - if let Err(e) = transaction::check_nonce_mempool(&signed_tx, &state).await { - metrics.increment_check_tx_removed_stale_nonce(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "failed verifying transaction nonce".into(), - log: e.to_string(), - ..response::CheckTx::default() - }; + return Err(FailedCheck::new( + AbciErrorCode::INVALID_PARAMETER, + format!("transaction failed stateless check: {e:#}"), + ) + .into()); }; - let finished_check_nonce = Instant::now(); - metrics.record_check_tx_duration_seconds_check_nonce( - finished_check_nonce.saturating_duration_since(finished_check_stateless), - ); + end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_stateless(end.saturating_duration_since(start)); + start = end; - if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, &state).await { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_CHAIN_ID.value()), - info: "failed verifying chain id".into(), - log: e.to_string(), - ..response::CheckTx::default() - }; + if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, state).await { + return Err(FailedCheck::new(AbciErrorCode::INVALID_CHAIN_ID, e).into()); } - let finished_check_chain_id = Instant::now(); - metrics.record_check_tx_duration_seconds_check_chain_id( - finished_check_chain_id.saturating_duration_since(finished_check_nonce), - ); - - if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await { - match removal_reason { - RemovalReason::Expired => { - metrics.increment_check_tx_removed_expired(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_EXPIRED.value()), - info: "transaction expired in app's mempool".into(), - log: "Transaction expired in the app's mempool".into(), - ..response::CheckTx::default() - }; - } - RemovalReason::FailedPrepareProposal(err) => { - metrics.increment_check_tx_removed_failed_execution(); - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_FAILED.value()), - info: "transaction failed execution in prepare_proposal()".into(), - log: format!("transaction failed execution because: {err}"), - ..response::CheckTx::default() - }; - } - RemovalReason::NonceStale => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INVALID_NONCE.value()), - info: "transaction removed from app mempool due to stale nonce".into(), - log: "Transaction from app mempool due to stale nonce".into(), - ..response::CheckTx::default() - }; - } - RemovalReason::LowerNonceInvalidated => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::LOWER_NONCE_INVALIDATED.value()), - info: "transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - log: "Transaction removed from app mempool due to lower nonce being \ - invalidated" - .into(), - ..response::CheckTx::default() - }; - } - } - }; + metrics.record_check_tx_duration_seconds_check_chain_id(start.elapsed()); - let finished_check_removed = Instant::now(); - metrics.record_check_tx_duration_seconds_check_removed( - finished_check_removed.saturating_duration_since(finished_check_chain_id), - ); + Ok(Arc::new(signed_tx)) +} - // tx is valid, push to mempool with current state - let address = match state - .try_base_prefixed(&signed_tx.verification_key().address_bytes()) - .await - .context("failed to generate address for signed transaction") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to generate address because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(address) => address, - }; +async fn run_mutable_checks( + signed_tx: Arc, + tx_hash: [u8; 32], + tx_len: usize, + state: S, + mempool: AppMempool, + metrics: &'static Metrics, +) -> Result { + let mut start = Instant::now(); + let current_account_nonce = + get_current_nonce_if_tx_nonce_valid(&signed_tx, &state, metrics).await?; + let mut end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_nonce(end.saturating_duration_since(start)); + start = end; + + check_removed_comet_bft(tx_hash, &mempool, metrics).await?; + end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_removed(end.saturating_duration_since(start)); + start = end; + + let address = convert_address(&signed_tx, &state).await?; + end = Instant::now(); + metrics.record_check_tx_duration_seconds_convert_address(end.saturating_duration_since(start)); + start = end; - // fetch current account - let current_account_nonce = match state - .get_account_nonce(address) - .await - .wrap_err("failed fetching nonce for account") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch account nonce because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(nonce) => nonce, - }; + // grab cost of transaction + let transaction_cost = get_total_transaction_cost(&signed_tx, &state).await?; + let end = Instant::now(); + metrics.record_check_tx_duration_seconds_fetch_tx_cost(end.saturating_duration_since(start)); + start = end; - let finished_convert_address = Instant::now(); - metrics.record_check_tx_duration_seconds_convert_address( - finished_convert_address.saturating_duration_since(finished_check_removed), - ); + // grab current account's balances + let current_account_balance = get_account_balances(address, &state).await?; + let end = Instant::now(); + metrics.record_check_tx_duration_seconds_fetch_balances(end.saturating_duration_since(start)); + start = end; - // grab cost of transaction - let transaction_cost = match transaction::get_total_transaction_cost(&signed_tx, &state) + let actions_count = signed_tx.actions().len(); + + let mempool_len = insert_to_mempool( + &mempool, + signed_tx, + current_account_nonce, + current_account_balance, + transaction_cost, + ) + .await?; + + metrics.record_check_tx_duration_seconds_insert_to_app_mempool(start.elapsed()); + metrics.record_actions_per_transaction_in_mempool(actions_count); + metrics.record_transaction_in_mempool_size_bytes(tx_len); + metrics.set_transactions_in_mempool_total(mempool_len); + + Ok(response::CheckTx::default()) +} + +async fn get_current_nonce_if_tx_nonce_valid( + signed_tx: &SignedTransaction, + state: &S, + metrics: &Metrics, +) -> Result { + transaction::get_current_nonce_if_tx_nonce_valid(signed_tx, state) .await - .context("failed fetching cost of the transaction") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch cost of the transaction because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(transaction_cost) => transaction_cost, - }; + .map_err(|error| { + metrics.increment_check_tx_removed_stale_nonce(); + FailedCheck::new(AbciErrorCode::INVALID_NONCE, error) + }) +} - let finished_fetch_tx_cost = Instant::now(); - metrics.record_check_tx_duration_seconds_fetch_tx_cost( - finished_fetch_tx_cost.saturating_duration_since(finished_convert_address), - ); +async fn check_removed_comet_bft( + tx_hash: [u8; 32], + mempool: &AppMempool, + metrics: &Metrics, +) -> Result<(), FailedCheck> { + let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await else { + return Ok(()); + }; + match removal_reason { + RemovalReason::Expired => { + metrics.increment_check_tx_removed_expired(); + Err(FailedCheck::new( + AbciErrorCode::TRANSACTION_EXPIRED, + "transaction expired in the app's mempool", + )) + } + RemovalReason::FailedPrepareProposal(err) => { + metrics.increment_check_tx_removed_failed_execution(); + Err(FailedCheck::new( + AbciErrorCode::TRANSACTION_FAILED, + format!("transaction failed execution: {err}"), + )) + } + RemovalReason::NonceStale => Err(FailedCheck::new( + AbciErrorCode::INVALID_NONCE, + "transaction removed from app mempool due to stale nonce", + )), + RemovalReason::LowerNonceInvalidated => Err(FailedCheck::new( + AbciErrorCode::LOWER_NONCE_INVALIDATED, + "transaction removed from app mempool due to lower nonce being invalidated", + )), + } +} - // grab current account's balances - let current_account_balance: HashMap = - match get_account_balances(&state, address) - .await - .with_context(|| "failed fetching balances for account `{address}`") - { - Err(err) => { - return response::CheckTx { - code: Code::Err(AbciErrorCode::INTERNAL_ERROR.value()), - info: AbciErrorCode::INTERNAL_ERROR.info(), - log: format!("failed to fetch account balances because: {err:#}"), - ..response::CheckTx::default() - }; - } - Ok(account_balance) => account_balance, - }; +async fn convert_address( + signed_tx: &SignedTransaction, + state: &S, +) -> Result { + state + .try_base_prefixed(&signed_tx.verification_key().address_bytes()) + .await + .map_err(|error| { + FailedCheck::new( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to generate address for signed transaction because: {error}"), + ) + }) +} - let finished_fetch_balances = Instant::now(); - metrics.record_check_tx_duration_seconds_fetch_balances( - finished_fetch_balances.saturating_duration_since(finished_fetch_tx_cost), - ); +async fn get_total_transaction_cost( + signed_tx: &SignedTransaction, + state: &S, +) -> Result, FailedCheck> { + transaction::get_total_transaction_cost(signed_tx, state) + .await + .map_err(|error| { + FailedCheck::new( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch cost of the transaction because: {error}"), + ) + }) +} - let actions_count = signed_tx.actions().len(); +async fn get_account_balances( + address: Address, + state: &S, +) -> Result, FailedCheck> { + mempool::get_account_balances(&state, address) + .await + .map_err(|error| { + FailedCheck::new( + AbciErrorCode::INTERNAL_ERROR, + format!("failed to fetch account balances for {address} because: {error}"), + ) + }) +} - if let Err(err) = mempool +async fn insert_to_mempool( + mempool: &AppMempool, + signed_tx: Arc, + current_account_nonce: u32, + current_account_balance: HashMap, + transaction_cost: HashMap, +) -> Result { + mempool .insert( - Arc::new(signed_tx), + signed_tx, current_account_nonce, current_account_balance, transaction_cost, ) .await - { - return response::CheckTx { - code: Code::Err(AbciErrorCode::TRANSACTION_INSERTION_FAILED.value()), - info: "transaction insertion failed".into(), - log: format!("transaction insertion failed because: {err:#}"), + .map_err(|error| { + FailedCheck::new( + AbciErrorCode::TRANSACTION_INSERTION_FAILED, + format!("transaction insertion failed because: {error}"), + ) + })?; + Ok(mempool.len().await) +} + +struct FailedCheck { + code: AbciErrorCode, + log: String, +} + +impl FailedCheck { + // allow: more convenient at callsites to take by value here. + #[allow(clippy::needless_pass_by_value)] + fn new(code: AbciErrorCode, log: T) -> Self { + Self { + code, + log: log.to_string(), + } + } +} + +impl From for response::CheckTx { + fn from(failure: FailedCheck) -> Self { + response::CheckTx { + code: tendermint::abci::Code::Err(failure.code.value()), + info: failure.code.info().to_string(), + log: failure.log, ..response::CheckTx::default() - }; + } } +} - let mempool_len = mempool.len().await; +#[cfg(test)] +mod tests { + use astria_core::{ + crypto::SigningKey, + protocol::transaction::v1alpha1::{ + action::ValidatorUpdate, + Action, + TransactionParams, + UnsignedTransaction, + }, + }; + use cnidarium::{ + StateDelta, + TempStorage, + }; + use telemetry::Metrics; + use tendermint::abci::Code; + + use super::*; + use crate::{ + accounts::StateWriteExt as _, + address::StateWriteExt as _, + bridge::StateWriteExt as _, + ibc::StateWriteExt as _, + state_ext::StateWriteExt as _, + }; - metrics - .record_check_tx_duration_seconds_insert_to_app_mempool(finished_fetch_balances.elapsed()); - metrics.record_actions_per_transaction_in_mempool(actions_count); - metrics.record_transaction_in_mempool_size_bytes(tx_len); - metrics.set_transactions_in_mempool_total(mempool_len); + #[tokio::test] + async fn should_cache_failure() { + let storage = TempStorage::new().await.unwrap(); + let mempool = AppMempool::new(); + let cached_immutable_checks = Arc::new(Cache::new(CACHE_SIZE)); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let request = request::CheckTx { + tx: Bytes::new(), + kind: request::CheckTxKind::New, + }; + let tx_hash: [u8; 32] = sha2::Sha256::digest(&request.tx).into(); + + // Should fail to parse and get added to the cache as `Err(response::CheckTx)`. + let response = handle_check_tx( + request, + storage.latest_snapshot(), + mempool.clone(), + cached_immutable_checks.clone(), + metrics, + ) + .await; + assert_eq!( + response.code.value(), + AbciErrorCode::INVALID_PARAMETER.value().get(), + "{response:?}" + ); + assert_eq!(cached_immutable_checks.len(), 1); + let cached_result = cached_immutable_checks.get(&tx_hash).unwrap(); + assert_eq!(cached_result.unwrap_err(), response); + } - response::CheckTx::default() + #[tokio::test] + async fn should_cache_success() { + let nonce = 1; + let chain_id = "chain-id"; + + let storage = TempStorage::new().await.unwrap(); + let snapshot = storage.latest_snapshot(); + let mut state_delta = StateDelta::new(snapshot); + state_delta + .put_chain_id_and_revision_number(tendermint::chain::Id::try_from(chain_id).unwrap()); + state_delta.put_transfer_base_fee(1).unwrap(); + state_delta.put_ics20_withdrawal_base_fee(1).unwrap(); + state_delta.put_init_bridge_account_base_fee(1); + state_delta.put_bridge_lock_byte_cost_multiplier(1); + state_delta.put_bridge_sudo_change_base_fee(1); + state_delta.put_base_prefix("a"); + let mempool = AppMempool::new(); + let cached_immutable_checks = Arc::new(Cache::new(CACHE_SIZE)); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let signing_key = SigningKey::from([1; 32]); + let action = ValidatorUpdate { + power: 0, + verification_key: signing_key.verification_key(), + }; + let unsigned_tx = UnsignedTransaction { + actions: vec![Action::ValidatorUpdate(action)], + params: TransactionParams::builder() + .nonce(nonce) + .chain_id(chain_id) + .build(), + }; + let signed_tx = unsigned_tx.into_signed(&signing_key).to_raw(); + let request = request::CheckTx { + tx: signed_tx.encode_to_vec().into(), + kind: request::CheckTxKind::New, + }; + let tx_hash: [u8; 32] = sha2::Sha256::digest(&request.tx).into(); + + // Should parse, pass immutable checks and get added to the cache as + // `Ok(SignedTransaction)`. + let response = handle_check_tx( + request, + state_delta, + mempool.clone(), + cached_immutable_checks.clone(), + metrics, + ) + .await; + assert_eq!(response.code, Code::Ok, "{response:?}"); + assert_eq!(cached_immutable_checks.len(), 1); + let cached_result = cached_immutable_checks.get(&tx_hash).unwrap(); + assert_eq!(cached_result.unwrap().to_raw(), signed_tx); + } } diff --git a/crates/astria-sequencer/src/transaction/checks.rs b/crates/astria-sequencer/src/transaction/checks.rs index f7f7985c3a..19c14adb79 100644 --- a/crates/astria-sequencer/src/transaction/checks.rs +++ b/crates/astria-sequencer/src/transaction/checks.rs @@ -31,11 +31,13 @@ use crate::{ state_ext::StateReadExt as _, }; +/// Returns the currently stored nonce of the tx signer's account if the tx nonce is not less than +/// it. #[instrument(skip_all)] -pub(crate) async fn check_nonce_mempool( +pub(crate) async fn get_current_nonce_if_tx_nonce_valid( tx: &SignedTransaction, state: &S, -) -> Result<()> { +) -> Result { let signer_address = state .try_base_prefixed(&tx.verification_key().address_bytes()) .await @@ -48,7 +50,7 @@ pub(crate) async fn check_nonce_mempool( .await .wrap_err("failed to get account nonce")?; ensure!(tx.nonce() >= curr_nonce, "nonce already used by account"); - Ok(()) + Ok(curr_nonce) } #[instrument(skip_all)] @@ -64,6 +66,17 @@ pub(crate) async fn check_chain_id_mempool( Ok(()) } +#[instrument(skip_all)] +pub(crate) async fn check_balance_mempool( + tx: &SignedTransaction, + state: &S, +) -> Result<()> { + check_balance_for_total_fees_and_transfers(tx, state) + .await + .context("balance check for total fees and transfers failed")?; + Ok(()) +} + #[instrument(skip_all)] pub(crate) async fn get_fees_for_transaction( tx: &UnsignedTransaction, diff --git a/crates/astria-sequencer/src/transaction/mod.rs b/crates/astria-sequencer/src/transaction/mod.rs index f0997f6738..c11923ca4a 100644 --- a/crates/astria-sequencer/src/transaction/mod.rs +++ b/crates/astria-sequencer/src/transaction/mod.rs @@ -20,7 +20,7 @@ use astria_eyre::{ pub(crate) use checks::{ check_balance_for_total_fees_and_transfers, check_chain_id_mempool, - check_nonce_mempool, + get_current_nonce_if_tx_nonce_valid, get_total_transaction_cost, }; use cnidarium::StateWrite;