Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/astria-core/src/primitive/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub struct IncorrectAddressLength {
received: usize,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub struct Address(
#[cfg_attr(feature = "serde", serde(serialize_with = "crate::serde::base64"))]
Expand Down
71 changes: 21 additions & 50 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,12 @@ impl App {
let mut execution_results = Vec::new();
let mut txs_to_readd_to_mempool = Vec::new();

while let Some((tx, priority)) = self.mempool.pop().await {
while let Some((enqueued_tx, priority)) = self.mempool.pop().await {
let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string();
let tx = enqueued_tx.signed_tx();
let bytes = tx.to_raw().encode_to_vec();
let tx_hash = Sha256::digest(&bytes);
let tx_len = bytes.len();
info!(tx_hash = %telemetry::display::hex(&tx_hash), "executing transaction");
info!(transaction_hash = %tx_hash_base64, "executing transaction");

// don't include tx if it would make the cometBFT block too large
if !block_size_constraints.cometbft_has_space(tx_len) {
Expand All @@ -487,12 +488,12 @@ impl App {
)
.increment(1);
debug!(
transaction_hash = %telemetry::display::base64(&tx_hash),
transaction_hash = %tx_hash_base64,
block_size_constraints = %json(&block_size_constraints),
tx_data_bytes = tx_len,
"excluding remaining transactions: max cometBFT data limit reached"
);
txs_to_readd_to_mempool.push((tx, priority));
txs_to_readd_to_mempool.push((enqueued_tx, priority));

// break from loop, as the block is full
break;
Expand All @@ -508,12 +509,12 @@ impl App {

if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) {
debug!(
transaction_hash = %telemetry::display::base64(&tx_hash),
transaction_hash = %tx_hash_base64,
block_size_constraints = %json(&block_size_constraints),
tx_data_bytes = tx_sequence_data_bytes,
"excluding transaction: max block sequenced data limit reached"
);
txs_to_readd_to_mempool.push((tx, priority));
txs_to_readd_to_mempool.push((enqueued_tx, priority));

// continue as there might be non-sequence txs that can fit
continue;
Expand All @@ -533,15 +534,15 @@ impl App {
.cometbft_checked_add(tx_len)
.context("error growing cometBFT block size")?;
validated_txs.push(bytes.into());
included_signed_txs.push(tx);
included_signed_txs.push((*tx).clone());
}
Err(e) => {
metrics::counter!(
metrics_init::PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE
)
.increment(1);
debug!(
transaction_hash = %telemetry::display::base64(&tx_hash),
transaction_hash = %tx_hash_base64,
error = AsRef::<dyn std::error::Error>::as_ref(&e),
"failed to execute transaction, not including in block"
);
Expand All @@ -552,7 +553,7 @@ impl App {
// if it's invalid due to the nonce being too low, it'll be
// removed from the mempool in `update_mempool_after_finalization`.
if e.downcast_ref::<InvalidNonce>().is_some() {
txs_to_readd_to_mempool.push((tx, priority));
txs_to_readd_to_mempool.push((enqueued_tx, priority));
}
}
}
Expand All @@ -566,14 +567,7 @@ impl App {
);
}

self.mempool
.insert_all(txs_to_readd_to_mempool)
.await
.expect(
"priority transaction nonce and transaction nonce matches for each tx, as they \
were just popped from the mempool and not modified",
);

self.mempool.insert_all(txs_to_readd_to_mempool).await;
let mempool_len = self.mempool.len().await;
debug!(mempool_len, "finished executing transactions from mempool");

Expand Down Expand Up @@ -634,7 +628,7 @@ impl App {
}

// execute tx and store in `execution_results` list on success
match self.execute_transaction(tx.clone()).await {
match self.execute_transaction(Arc::new(tx.clone())).await {
Ok(events) => {
execution_results.push(ExecTxResult {
events,
Expand Down Expand Up @@ -799,12 +793,12 @@ impl App {
for tx in finalize_block.txs.iter().skip(2) {
// remove any included txs from the mempool
let tx_hash = Sha256::digest(tx).into();
self.mempool.remove(&tx_hash).await;
self.mempool.remove(tx_hash).await;

let signed_tx = signed_transaction_from_bytes(tx)
.context("protocol error; only valid txs should be finalized")?;

match self.execute_transaction(signed_tx).await {
match self.execute_transaction(Arc::new(signed_tx)).await {
Ok(events) => tx_results.push(ExecTxResult {
events,
..Default::default()
Expand Down Expand Up @@ -972,8 +966,8 @@ impl App {
))]
pub(crate) async fn execute_transaction(
&mut self,
signed_tx: astria_core::protocol::transaction::v1alpha1::SignedTransaction,
) -> anyhow::Result<Vec<abci::Event>> {
signed_tx: Arc<SignedTransaction>,
) -> anyhow::Result<Vec<Event>> {
let signed_tx_2 = signed_tx.clone();
let stateless =
tokio::spawn(async move { transaction::check_stateless(&signed_tx_2).await });
Expand Down Expand Up @@ -1130,33 +1124,10 @@ async fn update_mempool_after_finalization<S: StateReadExt>(
mempool: &mut Mempool,
state: S,
) -> anyhow::Result<()> {
use crate::mempool::TransactionPriority;

let mut txs_to_remove = Vec::new();

for (tx, priority) in mempool.write().await.iter_mut() {
match TransactionPriority::new(
tx.nonce(),
state
.get_account_nonce(*tx.verification_key().address())
.await
.context("failed to fetch account nonce")?,
) {
Ok(new_priority) => *priority = new_priority,
Err(e) => {
let tx_hash = tx.sha256_of_proto_encoding();
debug!(
transaction_hash = %telemetry::display::base64(&tx_hash),
error = AsRef::<dyn std::error::Error>::as_ref(&e),
"account nonce is now greater than tx nonce; dropping tx from mempool",
);
txs_to_remove.push(tx_hash);
}
};
}

mempool.remove_all(&txs_to_remove).await;
Ok(())
let current_account_nonce_getter = |address: Address| state.get_account_nonce(address);
mempool
.update_priorities(current_account_nonce_getter)
.await
}

/// relevant data of a block being executed.
Expand Down
77 changes: 5 additions & 72 deletions crates/astria-sequencer/src/app/tests_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use crate::{
StateWriteExt,
},
genesis::Account,
mempool::TransactionPriority,
proposal::commitment::generate_rollup_datas_commitment,
state_ext::StateReadExt as _,
};
Expand Down Expand Up @@ -463,10 +462,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
// don't commit the result, now call prepare_proposal with the same data.
// this will reset the app state.
// this simulates executing the same block as a validator (specifically the proposer).
app.mempool
.insert(signed_tx, TransactionPriority::new(0, 0).unwrap())
.await
.unwrap();
app.mempool.insert(signed_tx, 0).await.unwrap();

let proposer_address = [88u8; 20].to_vec().try_into().unwrap();
let prepare_proposal = PrepareProposal {
Expand Down Expand Up @@ -575,14 +571,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
}
.into_signed(&alice_signing_key);

app.mempool
.insert(tx_pass, TransactionPriority::new(0, 0).unwrap())
.await
.unwrap();
app.mempool
.insert(tx_overflow, TransactionPriority::new(1, 0).unwrap())
.await
.unwrap();
app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand Down Expand Up @@ -654,14 +644,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
}
.into_signed(&alice_signing_key);

app.mempool
.insert(tx_pass, TransactionPriority::new(0, 0).unwrap())
.await
.unwrap();
app.mempool
.insert(tx_overflow, TransactionPriority::new(1, 0).unwrap())
.await
.unwrap();
app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand Down Expand Up @@ -755,54 +739,3 @@ async fn app_end_block_validator_updates() {
assert_eq!(validator_c.power, 100u32.into());
assert_eq!(app.state.get_validator_updates().await.unwrap().len(), 0);
}

#[tokio::test]
async fn update_mempool_after_finalization_update_account_nonce() {
let mut mempool = Mempool::new();

let storage = cnidarium::TempStorage::new().await.unwrap();
let snapshot = storage.latest_snapshot();

// insert tx with nonce 1, account nonce is 0
let tx = get_mock_tx(1);
let address = *tx.verification_key().address();
let priority = TransactionPriority::new(1, 0).unwrap();
mempool.insert(tx.clone(), priority).await.unwrap();

// update account nonce to 1
let mut state_tx = StateDelta::new(snapshot.clone());
state_tx.put_account_nonce(address, 1).unwrap();
storage.commit(state_tx).await.unwrap();

// ensure that mempool tx priority was updated
update_mempool_after_finalization(&mut mempool, storage.latest_snapshot())
.await
.unwrap();
let (_, priority) = mempool.pop().await.unwrap();
assert_eq!(priority, TransactionPriority::new(1, 1).unwrap());
}

#[tokio::test]
async fn update_mempool_after_finalization_remove_tx_if_nonce_too_low() {
let mut mempool = Mempool::new();

let storage = cnidarium::TempStorage::new().await.unwrap();
let snapshot = storage.latest_snapshot();

// insert tx with nonce 1, account nonce is 1
let tx = get_mock_tx(1);
let address = *tx.verification_key().address();
let priority = TransactionPriority::new(1, 1).unwrap();
mempool.insert(tx.clone(), priority).await.unwrap();

// update account nonce to 2
let mut state_tx = StateDelta::new(snapshot.clone());
state_tx.put_account_nonce(address, 2).unwrap();
storage.commit(state_tx).await.unwrap();

// ensure that tx was removed from mempool
update_mempool_after_finalization(&mut mempool, storage.latest_snapshot())
.await
.unwrap();
assert!(mempool.pop().await.is_none());
}
10 changes: 7 additions & 3 deletions crates/astria-sequencer/src/app/tests_breaking_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
//! Note: there are two actions not tested here: `Ics20Withdrawal` and `IbcRelay`.
//! These are due to the extensive setup needed to test them.
//! If changes are made to the execution results of these actions, manual testing is required.
use std::collections::HashMap;

use std::{
collections::HashMap,
sync::Arc,
};

use astria_core::{
primitive::v1::{
Expand Down Expand Up @@ -240,7 +244,7 @@ async fn app_execute_transaction_with_every_action_snapshot() {
],
};

let signed_tx = tx.into_signed(&alice_signing_key);
let signed_tx = Arc::new(tx.into_signed(&alice_signing_key));
app.execute_transaction(signed_tx).await.unwrap();

// execute BridgeUnlock action
Expand All @@ -260,7 +264,7 @@ async fn app_execute_transaction_with_every_action_snapshot() {
],
};

let signed_tx = tx.into_signed(&bridge_signing_key);
let signed_tx = Arc::new(tx.into_signed(&bridge_signing_key));
app.execute_transaction(signed_tx).await.unwrap();

app.prepare_commit(storage.clone()).await.unwrap();
Expand Down
Loading