Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/astria-core/src/protocol/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ impl AbciErrorCode {
pub const VALUE_NOT_FOUND: Self = Self(unsafe { NonZeroU32::new_unchecked(8) });
pub const TRANSACTION_EXPIRED: Self = Self(unsafe { NonZeroU32::new_unchecked(9) });
pub const TRANSACTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(10) });
pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(11) });
pub const TRANSACTION_INSERTION_FAILED: Self = Self(unsafe { NonZeroU32::new_unchecked(11) });
pub const LOWER_NONCE_INVALIDATED: Self = Self(unsafe { NonZeroU32::new_unchecked(12) });
pub const BAD_REQUEST: Self = Self(unsafe { NonZeroU32::new_unchecked(13) });
}

impl AbciErrorCode {
Expand All @@ -42,6 +44,10 @@ impl AbciErrorCode {
Self::TRANSACTION_FAILED => {
"the transaction failed to execute in prepare_proposal()".into()
}
Self::TRANSACTION_INSERTION_FAILED => {
"the transaction failed insertion into the mempool".into()
}
Self::LOWER_NONCE_INVALIDATED => "lower nonce was invalidated in mempool".into(),
Self::BAD_REQUEST => "the request payload was malformed".into(),
Self(other) => {
format!("invalid error code {other}: should be unreachable (this is a bug)")
Expand Down
1 change: 0 additions & 1 deletion crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.7
] }
ibc-proto = { version = "0.41.0", features = ["server"] }
matchit = "0.7.2"
priority-queue = "2.0.2"
tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
Expand Down
68 changes: 37 additions & 31 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ use tracing::{
};

use crate::{
accounts,
accounts::{
self,
component::AccountsComponent,
StateReadExt,
StateWriteExt as _,
},
address::StateWriteExt as _,
Expand Down Expand Up @@ -484,11 +485,21 @@ impl App {
let mut included_signed_txs = Vec::new();
let mut failed_tx_count: usize = 0;
let mut execution_results = Vec::new();
let mut txs_to_readd_to_mempool = Vec::new();
let mut excluded_txs: usize = 0;

// get copy of transactions to execute from mempool
let current_account_nonce_getter =
|address: [u8; 20]| self.state.get_account_nonce(address);
let pending_txs = self
.mempool
.builder_queue(current_account_nonce_getter)
.await
.expect("failed to fetch pending transactions");

while let Some((enqueued_tx, priority)) = self.mempool.pop().await {
let tx_hash_base64 = telemetry::display::base64(&enqueued_tx.tx_hash()).to_string();
let tx = enqueued_tx.signed_tx();
let mut unused_count = pending_txs.len();
for (tx_hash, tx) in pending_txs {
unused_count = unused_count.saturating_sub(1);
let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string();
let bytes = tx.to_raw().encode_to_vec();
let tx_len = bytes.len();
info!(transaction_hash = %tx_hash_base64, "executing transaction");
Expand All @@ -503,7 +514,7 @@ impl App {
tx_data_bytes = tx_len,
"excluding remaining transactions: max cometBFT data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// break from loop, as the block is full
break;
Expand All @@ -526,7 +537,7 @@ impl App {
tx_data_bytes = tx_sequence_data_bytes,
"excluding transaction: max block sequenced data limit reached"
);
txs_to_readd_to_mempool.push((enqueued_tx, priority));
excluded_txs = excluded_txs.saturating_add(1);

// continue as there might be non-sequence txs that can fit
continue;
Expand Down Expand Up @@ -558,18 +569,21 @@ impl App {
);

if e.downcast_ref::<InvalidNonce>().is_some() {
// we re-insert the tx into the mempool if it failed to execute
// we don't remove the tx from mempool if it failed to execute
// due to an invalid nonce, as it may be valid in the future.
// if it's invalid due to the nonce being too low, it'll be
// removed from the mempool in `update_mempool_after_finalization`.
txs_to_readd_to_mempool.push((enqueued_tx, priority));
} else {
failed_tx_count = failed_tx_count.saturating_add(1);

// the transaction should be removed from the cometbft mempool
// remove the failing transaction from the mempool
//
// this will remove any transactions from the same sender
// as well, as the dependent nonces will not be able
// to execute
self.mempool
.track_removal_comet_bft(
enqueued_tx.tx_hash(),
.remove_tx_invalid(
tx,
RemovalReason::FailedPrepareProposal(e.to_string()),
)
.await;
Expand All @@ -586,15 +600,12 @@ impl App {
);
}
self.metrics.set_prepare_proposal_excluded_transactions(
txs_to_readd_to_mempool
.len()
.saturating_add(failed_tx_count),
excluded_txs.saturating_add(failed_tx_count),
);

self.mempool.insert_all(txs_to_readd_to_mempool).await;
let mempool_len = self.mempool.len().await;
debug!(mempool_len, "finished executing transactions from mempool");
self.metrics.set_transactions_in_mempool_total(mempool_len);
debug!("{unused_count} leftover pending transactions");
self.metrics
.set_transactions_in_mempool_total(self.mempool.len().await);

self.execution_results = Some(execution_results);
Ok((validated_txs, included_signed_txs))
Expand Down Expand Up @@ -805,10 +816,6 @@ impl App {

// skip the first two transactions, as they are the rollup data commitments
for tx in finalize_block.txs.iter().skip(2) {
// remove any included txs from the mempool
let tx_hash = Sha256::digest(tx).into();
self.mempool.remove(tx_hash).await;

let signed_tx = signed_transaction_from_bytes(tx)
.context("protocol error; only valid txs should be finalized")?;

Expand Down Expand Up @@ -880,6 +887,10 @@ impl App {
state_tx
.put_sequencer_block(sequencer_block)
.context("failed to write sequencer block to state")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, &state_tx).await;

// events that occur after end_block are ignored here;
// there should be none anyways.
let _ = self.apply(state_tx);
Expand All @@ -890,11 +901,6 @@ impl App {
.await
.context("failed to prepare commit")?;

// update the priority of any txs in the mempool based on the updated app state
update_mempool_after_finalization(&mut self.mempool, self.state.clone())
.await
.context("failed to update mempool after finalization")?;

Ok(abci::response::FinalizeBlock {
events: end_block.events,
validator_updates: end_block.validator_updates,
Expand Down Expand Up @@ -1121,10 +1127,10 @@ impl App {
// the mempool is large.
async fn update_mempool_after_finalization<S: accounts::StateReadExt>(
mempool: &mut Mempool,
state: S,
) -> anyhow::Result<()> {
state: &S,
) {
let current_account_nonce_getter = |address: [u8; 20]| state.get_account_nonce(address);
mempool.run_maintenance(current_account_nonce_getter).await
mempool.run_maintenance(current_account_nonce_getter).await;
}

/// relevant data of a block being executed.
Expand Down
12 changes: 9 additions & 3 deletions crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use astria_core::{
crypto::SigningKey,
primitive::v1::RollupId,
Expand Down Expand Up @@ -138,21 +140,25 @@ pub(crate) async fn initialize_app(
app
}

pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {
pub(crate) fn mock_tx(
nonce: u32,
signer: &SigningKey,
rollup_name: &str,
) -> Arc<SignedTransaction> {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions: vec![
SequenceAction {
rollup_id: RollupId::from_unhashed_bytes([0; 32]),
rollup_id: RollupId::from_unhashed_bytes(rollup_name.as_bytes()),
data: Bytes::from_static(&[0x99]),
fee_asset: "astria".parse().unwrap(),
}
.into(),
],
};

tx.into_signed(&get_alice_signing_key())
Arc::new(tx.into_signed(signer))
}
28 changes: 23 additions & 5 deletions crates/astria-sequencer/src/app/tests_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
// don't commit the result, now call prepare_proposal with the same data.
// this will reset the app state.
// this simulates executing the same block as a validator (specifically the proposer).
app.mempool.insert(signed_tx, 0).await.unwrap();
app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap();

let proposer_address = [88u8; 20].to_vec().try_into().unwrap();
let prepare_proposal = PrepareProposal {
Expand All @@ -473,6 +473,12 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
assert_eq!(prepare_proposal_result.txs, finalize_block.txs);
assert_eq!(app.executed_proposal_hash, Hash::default());
assert_eq!(app.validator_address.unwrap(), proposer_address);
// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

assert_eq!(app.mempool.len().await, 0);

// call process_proposal - should not re-execute anything.
Expand Down Expand Up @@ -561,8 +567,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -581,6 +587,12 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
.await
.expect("too large transactions should not cause prepare proposal to fail");

// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down Expand Up @@ -634,8 +646,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
}
.into_signed(&alice);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand All @@ -654,6 +666,12 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
.await
.expect("too large transactions should not cause prepare proposal to fail");

// run maintence to clear out transactions
let current_account_nonce_getter = |address: [u8; 20]| app.state.get_account_nonce(address);
app.mempool
.run_maintenance(current_account_nonce_getter)
.await;

// see only first tx made it in
assert_eq!(
result.txs.len(),
Expand Down
19 changes: 13 additions & 6 deletions crates/astria-sequencer/src/grpc/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,20 @@ mod test {

let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
let nonce = 99;
let tx = crate::app::test_utils::get_mock_tx(nonce);
// insert a transaction with a nonce gap
let gapped_nonce = 99;
let tx = crate::app::test_utils::mock_tx(gapped_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert a tx with lower nonce also, but we should get the highest nonce
let lower_nonce = 98;
let tx = crate::app::test_utils::get_mock_tx(lower_nonce);
// insert a transaction at the current nonce
let account_nonce = 0;
let tx = crate::app::test_utils::mock_tx(account_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

// insert a transactions one above account nonce (not gapped)
let sequential_nonce = 1;
let tx: Arc<astria_core::protocol::transaction::v1alpha1::SignedTransaction> =
crate::app::test_utils::mock_tx(sequential_nonce, &get_alice_signing_key(), "test");
mempool.insert(tx, 0).await.unwrap();

let server = Arc::new(SequencerServer::new(storage.clone(), mempool));
Expand All @@ -279,7 +286,7 @@ mod test {
};
let request = Request::new(request);
let response = server.get_pending_nonce(request).await.unwrap();
assert_eq!(response.into_inner().inner, nonce);
assert_eq!(response.into_inner().inner, sequential_nonce);
}

#[tokio::test]
Expand Down
Loading