Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/astria-sequencer/.gitignore

This file was deleted.

1 change: 0 additions & 1 deletion crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.7
cnidarium-component = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" }
ibc-proto = { version = "0.41.0", features = ["server"] }
matchit = "0.7.2"
priority-queue = "2.0.2"
tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
Expand Down
21 changes: 10 additions & 11 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,16 @@ impl App {
// get copy of transactions to execute from mempool
let current_account_nonce_getter =
|address: [u8; 20]| self.state.get_account_nonce(address);
let mut pending_txs = self
let pending_txs = self
.mempool
.builder_queue(current_account_nonce_getter)
.await
.expect("failed to fetch pending transactions");

while let Some((timemarked_tx, _)) = pending_txs.pop() {
let tx_hash_base64 = telemetry::display::base64(&timemarked_tx.tx_hash()).to_string();
let tx = timemarked_tx.signed_tx();
let mut unused_count = pending_txs.len();
for (tx_hash, tx) in pending_txs {
unused_count = unused_count.saturating_sub(1);
let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string();
let bytes = tx.to_raw().encode_to_vec();
let tx_len = bytes.len();
info!(transaction_hash = %tx_hash_base64, "executing transaction");
Expand Down Expand Up @@ -545,7 +546,7 @@ impl App {
}

// execute tx and store in `execution_results` list on success
match self.execute_transaction(Arc::new(tx.clone())).await {
match self.execute_transaction(tx.clone()).await {
Ok(events) => {
execution_results.push(ExecTxResult {
events,
Expand Down Expand Up @@ -605,7 +606,7 @@ impl App {
excluded_txs.saturating_add(failed_tx_count),
);

debug!("{} {}", pending_txs.len(), "leftover pending transactions");
debug!("{unused_count} leftover pending transactions");
self.metrics
.set_transactions_in_mempool_total(self.mempool.len().await);

Expand Down Expand Up @@ -891,9 +892,7 @@ impl App {
.context("failed to write sequencer block to state")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, &state_tx)
.await
.context("failed to update mempool after finalization")?;
update_mempool_after_finalization(&mut self.mempool, &state_tx).await;

// events that occur after end_block are ignored here;
// there should be none anyways.
Expand Down Expand Up @@ -1149,9 +1148,9 @@ impl App {
async fn update_mempool_after_finalization<S: accounts::StateReadExt>(
mempool: &mut Mempool,
state: &S,
) -> anyhow::Result<()> {
) {
let current_account_nonce_getter = |address: [u8; 20]| state.get_account_nonce(address);
mempool.run_maintenance(current_account_nonce_getter).await
mempool.run_maintenance(current_account_nonce_getter).await;
}

/// relevant data of a block being executed.
Expand Down
31 changes: 7 additions & 24 deletions crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use astria_core::{
crypto::SigningKey,
primitive::v1::RollupId,
Expand Down Expand Up @@ -137,44 +139,25 @@ pub(crate) async fn initialize_app(
app
}

pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions: vec![
SequenceAction {
rollup_id: RollupId::from_unhashed_bytes([0; 32]),
data: vec![0x99],
fee_asset: "astria".parse().unwrap(),
}
.into(),
],
};

tx.into_signed(&get_alice_signing_key())
}

pub(crate) fn get_mock_tx_parameterized(
pub(crate) fn mock_tx(
nonce: u32,
signer: &SigningKey,
data_bytes: [u8; 32],
) -> SignedTransaction {
rollup_name: &str,
) -> Arc<SignedTransaction> {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions: vec![
SequenceAction {
rollup_id: RollupId::from_unhashed_bytes(data_bytes),
rollup_id: RollupId::from_unhashed_bytes(rollup_name.as_bytes()),
data: vec![0x99],
fee_asset: "astria".parse().unwrap(),
}
.into(),
],
};

tx.into_signed(signer)
Arc::new(tx.into_signed(signer))
}
20 changes: 8 additions & 12 deletions crates/astria-sequencer/src/app/tests_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
// don't commit the result, now call prepare_proposal with the same data.
// this will reset the app state.
// this simulates executing the same block as a validator (specifically the proposer).
app.mempool.insert(signed_tx, 0).await.unwrap();
app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap();

let proposer_address = [88u8; 20].to_vec().try_into().unwrap();
let prepare_proposal = PrepareProposal {
Expand All @@ -474,8 +474,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await
.unwrap();
.await;

assert_eq!(app.mempool.len().await, 0);

Expand Down Expand Up @@ -565,8 +564,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -589,9 +588,7 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await
.unwrap();

.await;
// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down Expand Up @@ -645,8 +642,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -669,8 +666,7 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await
.unwrap();
.await;

// see only first tx made it in
assert_eq!(
Expand Down
8 changes: 4 additions & 4 deletions crates/astria-sequencer/src/grpc/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,12 @@ mod test {
let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
let nonce = 99;
let tx = crate::app::test_utils::get_mock_tx(nonce);
let tx = crate::app::test_utils::mock_tx(nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert tx into pending
let nonce = 0;
let tx = crate::app::test_utils::get_mock_tx(nonce);
// insert a tx with lower nonce also, but we should get the highest nonce
let lower_nonce = 98;
let tx = crate::app::test_utils::mock_tx(lower_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

let server = Arc::new(SequencerServer::new(storage.clone(), mempool));
Expand Down
63 changes: 49 additions & 14 deletions crates/astria-sequencer/src/mempool/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

use std::{
collections::HashMap,
sync::OnceLock,
sync::{
Arc,
OnceLock,
},
time::Duration,
};

Expand All @@ -25,6 +28,10 @@ use astria_core::{
UnsignedTransaction,
},
};
use sha2::{
Digest as _,
Sha256,
};

use super::{
Mempool,
Expand Down Expand Up @@ -52,8 +59,8 @@ fn signing_keys() -> impl Iterator<Item = &'static SigningKey> {
}

/// Returns a static ref to a collection of `MAX_INITIAL_TXS + 1` transactions.
fn transactions() -> &'static Vec<SignedTransaction> {
static TXS: OnceLock<Vec<SignedTransaction>> = OnceLock::new();
fn transactions() -> &'static Vec<Arc<SignedTransaction>> {
static TXS: OnceLock<Vec<Arc<SignedTransaction>>> = OnceLock::new();
TXS.get_or_init(|| {
let mut nonces_and_chain_ids = HashMap::new();
signing_keys()
Expand All @@ -74,11 +81,12 @@ fn transactions() -> &'static Vec<SignedTransaction> {
data: vec![2; 1000],
fee_asset: Denom::IbcPrefixed(IbcPrefixed::new([3; 32])),
};
UnsignedTransaction {
let tx = UnsignedTransaction {
actions: vec![Action::Sequence(sequence_action)],
params,
}
.into_signed(signing_key)
.into_signed(signing_key);
Arc::new(tx)
})
.take(MAX_INITIAL_TXS + 1)
.collect()
Expand Down Expand Up @@ -153,13 +161,21 @@ fn init_mempool<T: MempoolSize>() -> Mempool {
for tx in transactions().iter().take(T::checked_size()) {
mempool.insert(tx.clone(), 0).await.unwrap();
}
for i in 0..super::REMOVAL_CACHE_SIZE {
let hash = Sha256::digest(i.to_le_bytes()).into();
mempool
.comet_bft_removal_cache
.write()
.await
.add(hash, RemovalReason::Expired);
}
});
mempool
}

/// Returns the first transaction from the static `transactions()` not included in the initialized
/// mempool, i.e. the one at index `T::size()`.
fn get_unused_tx<T: MempoolSize>() -> SignedTransaction {
fn get_unused_tx<T: MempoolSize>() -> Arc<SignedTransaction> {
transactions().get(T::checked_size()).unwrap().clone()
}

Expand Down Expand Up @@ -190,8 +206,7 @@ fn insert<T: MempoolSize>(bencher: divan::Bencher) {

/// Benchmarks `Mempool::builder_queue` on a mempool with the given number of existing entries.
///
/// Note: this benchmark doesn't capture the nuances of dealing with parked vs pending
/// transactions.
/// Note: this benchmark doesn't capture the nuances of dealing with parked vs pending transactions.
#[divan::bench(
max_time = MAX_TIME,
types = [
Expand Down Expand Up @@ -222,8 +237,8 @@ fn builder_queue<T: MempoolSize>(bencher: divan::Bencher) {
/// Benchmarks `Mempool::remove_tx_invalid` for a single transaction on a mempool with the given
/// number of existing entries.
///
/// Note about this benchmark: `remove_tx_invalid()` will removes all higher nonces. To keep this
/// benchmark comparible with the previous mempool, we're removing the highest nonce. In the future
/// Note about this benchmark: `remove_tx_invalid()` will remove all higher nonces. To keep this
/// benchmark comparable with the previous mempool, we're removing the highest nonce. In the future
/// it would be better to have this bench remove the midpoint.
#[divan::bench(
max_time = MAX_TIME,
Expand All @@ -243,6 +258,7 @@ fn remove_tx_invalid<T: MempoolSize>(bencher: divan::Bencher) {
.with_inputs(|| {
let signed_tx = transactions()
.get(T::checked_size().saturating_sub(1))
.cloned()
.unwrap();
(init_mempool::<T>(), signed_tx)
})
Expand All @@ -255,6 +271,28 @@ fn remove_tx_invalid<T: MempoolSize>(bencher: divan::Bencher) {
});
}

/// Benchmarks `Mempool::check_removed_comet_bft` for a single transaction on a mempool with the
/// `comet_bft_removal_cache` filled.
///
/// Note that the number of entries in the main cache is irrelevant here.
#[divan::bench(max_time = MAX_TIME)]
fn check_removed_comet_bft(bencher: divan::Bencher) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
bencher
.with_inputs(|| {
let tx_hash = Sha256::digest(0_usize.to_le_bytes()).into();
(init_mempool::<mempool_with_100_txs>(), tx_hash)
})
.bench_values(move |(mempool, tx_hash)| {
runtime.block_on(async {
mempool.check_removed_comet_bft(tx_hash).await.unwrap();
});
});
}

/// Benchmarks `Mempool::run_maintenance` on a mempool with the given number of existing entries.
#[divan::bench(
max_time = MAX_TIME,
Expand Down Expand Up @@ -283,10 +321,7 @@ fn run_maintenance<T: MempoolSize>(bencher: divan::Bencher) {
.with_inputs(|| init_mempool::<T>())
.bench_values(move |mempool| {
runtime.block_on(async {
mempool
.run_maintenance(current_account_nonce_getter)
.await
.unwrap();
mempool.run_maintenance(current_account_nonce_getter).await;
});
});
}
Expand Down
Loading