diff --git a/Cargo.lock b/Cargo.lock index 21684370bd..c34e200012 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -771,6 +771,7 @@ dependencies = [ "bytes", "cnidarium", "cnidarium-component", + "divan", "futures", "hex", "ibc-proto", diff --git a/Cargo.toml b/Cargo.toml index 2d6795d817..e55b7f13e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ celestia-tendermint = "0.32.1" celestia-types = "0.1.1" clap = "4.5.4" const_format = "0.2.32" +divan = "0.1.14" ethers = { version = "2.0.11", default-features = false } futures = "0.3" hex = "0.4" @@ -68,20 +69,18 @@ humantime = "2.1.0" hyper = "0.14" ibc-types = "0.12" indexmap = "2.1.0" +insta = "1.36.1" itertools = "0.12.1" itoa = "1.0.10" jsonrpsee = { version = "0.20" } -once_cell = "1.17.1" -pin-project-lite = "0.2.13" -sha2 = "0.10" -serde = "1" -serde_json = "1" metrics = "0.22.1" +once_cell = "1.17.1" pbjson-types = "0.6" # Note that when updating the penumbra versions, vendored types in `proto/sequencerapis/astria_vendored` may need to be updated as well. penumbra-ibc = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0", default-features = false } penumbra-proto = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" } penumbra-tower-trace = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" } +pin-project-lite = "0.2.13" prost = "0.12" rand = "0.8.5" regex = "1.9" @@ -90,6 +89,9 @@ regex = "1.9" reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls", ] } +serde = "1" +serde_json = "1" +sha2 = "0.10" tempfile = "3.6.0" tendermint = "0.34.0" tendermint-config = "0.34.0" @@ -97,6 +99,7 @@ tendermint-proto = "0.34.0" tendermint-rpc = "0.34.0" thiserror = "1" tokio = "1.28" +tokio-stream = { version = "0.1.14" } tokio-test = "0.4.2" tokio-util = "0.7.9" tonic = "0.10" @@ -104,5 +107,3 @@ tracing = "0.1" tryhard = "0.5.1" which = "4.4.0" wiremock = "0.5" -insta = "1.36.1" -tokio-stream = { version = "0.1.14" } diff --git a/crates/astria-merkle/Cargo.toml b/crates/astria-merkle/Cargo.toml index c80203cfa7..922689da44 100644 --- a/crates/astria-merkle/Cargo.toml +++ b/crates/astria-merkle/Cargo.toml @@ -13,7 +13,7 @@ sha2 = { workspace = true } [dev-dependencies] ct-merkle = "0.1.0" -divan = "0.1.14" +divan = { workspace = true } hex-literal = { workspace = true } [features] diff --git a/crates/astria-merkle/benches/benchmark.rs b/crates/astria-merkle/benches/benchmark.rs index cb70e988a7..3d10f45d34 100644 --- a/crates/astria-merkle/benches/benchmark.rs +++ b/crates/astria-merkle/benches/benchmark.rs @@ -1,5 +1,3 @@ -#![allow(clippy::wildcard_imports)] - use astria_merkle::Tree; use ct_merkle::CtMerkleTree; use divan::{ diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index a85fb864d5..03bab5a051 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -25,18 +25,21 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur anyhow = "1" borsh = { version = "1", features = ["derive"] } +cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0", features = [ + "metrics", +] } +cnidarium-component = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" } +ibc-proto = { version = "0.41.0", features = ["server"] } matchit = "0.7.2" priority-queue = "2.0.2" tower = "0.4" tower-abci = "0.12.0" tower-actor = "0.1.0" -cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0", features = [ - "metrics", -] } -cnidarium-component = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" } +tower-http = { version = "0.4", features = ["cors"] } async-trait = { workspace = true } bytes = { workspace = true } +divan = { workspace = true } futures = { workspace = true } hex = { workspace = true, features = ["serde"] } ibc-types = { workspace = true, features = ["with_serde"] } @@ -57,9 +60,6 @@ tokio = { workspace = true, features = ["rt", "tracing"] } tonic = { workspace = true } tracing = { workspace = true } -ibc-proto = { version = "0.41.0", features = ["server"] } -tower-http = { version = "0.4", features = ["cors"] } - [dev-dependencies] astria-core = { path = "../astria-core", features = [ "server", @@ -75,3 +75,7 @@ tokio = { workspace = true, features = ["test-util"] } [build-dependencies] astria-build-info = { path = "../astria-build-info", features = ["build"] } + +[[bench]] +name = "benchmark" +harness = false diff --git a/crates/astria-sequencer/benches/benchmark.rs b/crates/astria-sequencer/benches/benchmark.rs new file mode 100644 index 0000000000..57c6df628a --- /dev/null +++ b/crates/astria-sequencer/benches/benchmark.rs @@ -0,0 +1,17 @@ +// Required to force the benchmark target to actually register the divan benchmark cases. +use astria_sequencer as _; + +fn main() { + // Handle `nextest` querying the benchmark binary for tests. Currently `divan` is incompatible + // with `nextest`, so just report no tests available. + // See https://github.com/nvzqz/divan/issues/43 for further details. + let args: Vec<_> = std::env::args().collect(); + if args.contains(&"--list".to_string()) + && args.contains(&"--format".to_string()) + && args.contains(&"terse".to_string()) + { + return; + } + // Run registered benchmarks. + divan::main(); +} diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs new file mode 100644 index 0000000000..dae8b13143 --- /dev/null +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -0,0 +1,368 @@ +#![allow(non_camel_case_types)] + +use std::{ + collections::HashMap, + sync::OnceLock, + time::Duration, +}; + +use astria_core::{ + crypto::SigningKey, + primitive::v1::{ + asset::{ + Denom, + IbcPrefixed, + }, + Address, + RollupId, + }, + protocol::transaction::v1alpha1::{ + action::{ + Action, + SequenceAction, + }, + SignedTransaction, + TransactionParams, + UnsignedTransaction, + }, +}; +use sha2::{ + Digest as _, + Sha256, +}; + +use super::{ + Mempool, + RemovalReason, +}; + +/// The maximum number of transactions with which to initialize the mempool. +const MAX_INITIAL_TXS: usize = 100_000; +/// The max time for any benchmark. +const MAX_TIME: Duration = Duration::from_secs(30); +/// The number of different signers of transactions, and also the number of different chain IDs. +const SIGNER_COUNT: u8 = 10; + +/// Returns an endlessly-repeating iterator over `SIGNER_COUNT` separate signing keys. +fn signing_keys() -> impl Iterator { + static SIGNING_KEYS: OnceLock> = OnceLock::new(); + SIGNING_KEYS + .get_or_init(|| { + (0..SIGNER_COUNT) + .map(|i| SigningKey::from([i; 32])) + .collect() + }) + .iter() + .cycle() +} + +/// Returns a static ref to a collection of `MAX_INITIAL_TXS + 1` transactions. +fn transactions() -> &'static Vec { + static TXS: OnceLock> = OnceLock::new(); + TXS.get_or_init(|| { + crate::address::initialize_base_prefix("benchmarks").unwrap(); + let mut nonces_and_chain_ids = HashMap::new(); + signing_keys() + .map(move |signing_key| { + let verification_key = signing_key.verification_key(); + let (nonce, chain_id) = nonces_and_chain_ids + .entry(verification_key) + .or_insert_with(|| { + (0_u32, format!("chain-{}", signing_key.verification_key())) + }); + *nonce = (*nonce).wrapping_add(1); + let params = TransactionParams::builder() + .nonce(*nonce) + .chain_id(chain_id.as_str()) + .build(); + let sequence_action = SequenceAction { + rollup_id: RollupId::new([1; 32]), + data: vec![2; 1000], + fee_asset: Denom::IbcPrefixed(IbcPrefixed::new([3; 32])), + }; + UnsignedTransaction { + actions: vec![Action::Sequence(sequence_action)], + params, + } + .into_signed(signing_key) + }) + .take(MAX_INITIAL_TXS + 1) + .collect() + }) +} + +/// This trait exists so we can get better output from `divan` by configuring the various mempool +/// sizes as types rather than consts. With types we get output like: +/// ```text +/// ╰─ insert_new_tx +/// ├─ mempool_with_100_txs +/// ... +/// ╰─ mempool_with_100000_txs +/// ``` +/// rather than: +/// ```text +/// ╰─ insert_new_tx +/// ├─ 100 +/// ... +/// ╰─ 100000 +/// ``` +trait MempoolSize { + fn size() -> usize; + + fn checked_size() -> usize { + assert!(Self::size() <= MAX_INITIAL_TXS); + Self::size() + } +} + +struct mempool_with_100_txs; + +struct mempool_with_1000_txs; + +struct mempool_with_10000_txs; + +struct mempool_with_100000_txs; + +impl MempoolSize for mempool_with_100_txs { + fn size() -> usize { + 100 + } +} + +impl MempoolSize for mempool_with_1000_txs { + fn size() -> usize { + 1_000 + } +} + +impl MempoolSize for mempool_with_10000_txs { + fn size() -> usize { + 10_000 + } +} + +impl MempoolSize for mempool_with_100000_txs { + fn size() -> usize { + 100_000 + } +} + +/// Returns a new `Mempool` initialized with the number of transactions specified by `T::size()` +/// taken from the static `transactions()`, and with a full `comet_bft_removal_cache`. +fn init_mempool() -> Mempool { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let mempool = Mempool::new(); + runtime.block_on(async { + for tx in transactions().iter().take(T::checked_size()) { + mempool.insert(tx.clone(), 0).await.unwrap(); + } + for i in 0..super::REMOVAL_CACHE_SIZE { + let hash = Sha256::digest(i.to_le_bytes()).into(); + mempool + .track_removal_comet_bft(hash, RemovalReason::Expired) + .await; + } + }); + mempool +} + +/// Returns the first transaction from the static `transactions()` not included in the initialized +/// mempool, i.e. the one at index `T::size()`. +fn get_unused_tx() -> SignedTransaction { + transactions().get(T::checked_size()).unwrap().clone() +} + +/// Benchmarks `Mempool::insert` for a single new transaction on a mempool with the given number of +/// existing entries. +#[divan::bench( + max_time = MAX_TIME, + types = [ + mempool_with_100_txs, + mempool_with_1000_txs, + mempool_with_10000_txs, + mempool_with_100000_txs + ] +)] +fn insert(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| (init_mempool::(), get_unused_tx::())) + .bench_values(move |(mempool, tx)| { + runtime.block_on(async { + mempool.insert(tx, 0).await.unwrap(); + }); + }); +} + +/// Benchmarks `Mempool::pop` on a mempool with the given number of existing entries. +#[divan::bench( + max_time = MAX_TIME, + types = [ + mempool_with_100_txs, + mempool_with_1000_txs, + mempool_with_10000_txs, + mempool_with_100000_txs + ] +)] +fn pop(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| init_mempool::()) + .bench_values(move |mempool| { + runtime.block_on(async { + mempool.pop().await.unwrap(); + }); + }); +} + +/// Benchmarks `Mempool::remove` for a single transaction on a mempool with the given number of +/// existing entries. +#[divan::bench( + max_time = MAX_TIME, + types = [ + mempool_with_100_txs, + mempool_with_1000_txs, + mempool_with_10000_txs, + mempool_with_100000_txs + ] +)] +fn remove(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| { + let tx_hash = transactions().first().unwrap().sha256_of_proto_encoding(); + (init_mempool::(), tx_hash) + }) + .bench_values(move |(mempool, tx_hash)| { + runtime.block_on(async { + mempool.remove(tx_hash).await; + }); + }); +} + +/// Benchmarks `Mempool::track_removal_comet_bft` for a single new transaction on a mempool with +/// the `comet_bft_removal_cache` filled. +/// +/// Note that the number of entries in the main cache is irrelevant here. +#[divan::bench(max_time = MAX_TIME)] +fn track_removal_comet_bft(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| { + let tx_hash = transactions().first().unwrap().sha256_of_proto_encoding(); + (init_mempool::(), tx_hash) + }) + .bench_values(move |(mempool, tx_hash)| { + runtime.block_on(async { + mempool + .track_removal_comet_bft(tx_hash, RemovalReason::Expired) + .await; + }); + }); +} + +/// Benchmarks `Mempool::check_removed_comet_bft` for a single transaction on a mempool with the +/// `comet_bft_removal_cache` filled. +/// +/// Note that the number of entries in the main cache is irrelevant here. +#[divan::bench(max_time = MAX_TIME)] +fn check_removed_comet_bft(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| { + let tx_hash = Sha256::digest(0_usize.to_le_bytes()).into(); + (init_mempool::(), tx_hash) + }) + .bench_values(move |(mempool, tx_hash)| { + runtime.block_on(async { + mempool.check_removed_comet_bft(tx_hash).await.unwrap(); + }); + }); +} + +/// Benchmarks `Mempool::run_maintenance` on a mempool with the given number of existing entries. +#[divan::bench( + max_time = MAX_TIME, + types = [ + mempool_with_100_txs, + mempool_with_1000_txs, + mempool_with_10000_txs, + mempool_with_100000_txs + ] +)] +fn run_maintenance(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + // Set the new nonce so that the entire `REMOVAL_CACHE_SIZE` entries in the + // `comet_bft_removal_cache` are replaced (assuming this test case has enough txs). + // allow: this is test-only code, using small values, and where the result is not critical. + #[allow(clippy::arithmetic_side_effects, clippy::cast_possible_truncation)] + let new_nonce = (super::REMOVAL_CACHE_SIZE as u32 / u32::from(SIGNER_COUNT)) + 1; + // Although in production this getter will be hitting the state store and will be slower than + // this test one, it's probably insignificant as the getter is only called once per address, + // and we don't expect a high number of discrete addresses in the mempool entries. + let current_account_nonce_getter = |_: Address| async { Ok(new_nonce) }; + bencher + .with_inputs(|| init_mempool::()) + .bench_values(move |mempool| { + runtime.block_on(async { + mempool + .run_maintenance(current_account_nonce_getter) + .await + .unwrap(); + }); + }); +} + +/// Benchmarks `Mempool::pending_nonce` on a mempool with the given number of existing entries. +#[divan::bench( + max_time = MAX_TIME, + types = [ + mempool_with_100_txs, + mempool_with_1000_txs, + mempool_with_10000_txs, + mempool_with_100000_txs + ] +)] +fn pending_nonce(bencher: divan::Bencher) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + bencher + .with_inputs(|| { + let address = crate::address::base_prefixed( + transactions() + .first() + .unwrap() + .verification_key() + .address_bytes(), + ); + (init_mempool::(), address) + }) + .bench_values(move |(mempool, address)| { + runtime.block_on(async { + mempool.pending_nonce(&address).await.unwrap(); + }); + }); +} diff --git a/crates/astria-sequencer/src/mempool.rs b/crates/astria-sequencer/src/mempool/mod.rs similarity index 99% rename from crates/astria-sequencer/src/mempool.rs rename to crates/astria-sequencer/src/mempool/mod.rs index a33035d65e..dcce7c3fc8 100644 --- a/crates/astria-sequencer/src/mempool.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -1,3 +1,5 @@ +mod benchmarks; + use std::{ cmp::{ self, @@ -139,7 +141,7 @@ pub(crate) enum RemovalReason { FailedPrepareProposal(String), } -const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes +const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes const REMOVAL_CACHE_SIZE: usize = 4096; /// `RemovalCache` is used to signal to `CometBFT` that a @@ -310,10 +312,7 @@ impl Mempool { /// checks if a transaction was flagged to be removed from the `CometBFT` mempool /// and removes entry - pub(crate) async fn check_removed_comet_bft( - &mut self, - tx_hash: [u8; 32], - ) -> Option { + pub(crate) async fn check_removed_comet_bft(&self, tx_hash: [u8; 32]) -> Option { self.comet_bft_removal_cache.write().await.remove(tx_hash) }