Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/sequencer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.23.1
version: 0.23.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
Expand Down
1 change: 1 addition & 0 deletions charts/sequencer/templates/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_SEQUENCER_MEMPOOL_PARKED_MAX_TX_COUNT: "{{ .Values.sequencer.mempool.parked.maxTxCount }}"
{{- end }}
---
3 changes: 3 additions & 0 deletions charts/sequencer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ genesis:
# pubKey: lV57+rGs2vac7mvkGHP1oBFGHPJM3a+WoAzeFDCJDNU=

sequencer:
mempool:
parked:
maxTxCount: 200
metrics:
enabled: false
otel:
Expand Down
2 changes: 2 additions & 0 deletions crates/astria-core/src/protocol/abci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl AbciErrorCode {
pub const ALREADY_PRESENT: Self = Self(unsafe { NonZeroU32::new_unchecked(14) });
pub const NONCE_TAKEN: Self = Self(unsafe { NonZeroU32::new_unchecked(15) });
pub const ACCOUNT_SIZE_LIMIT: Self = Self(unsafe { NonZeroU32::new_unchecked(16) });
pub const PARKED_FULL: Self = Self(unsafe { NonZeroU32::new_unchecked(17) });
}

impl AbciErrorCode {
Expand Down Expand Up @@ -62,6 +63,7 @@ impl AbciErrorCode {
Self::ACCOUNT_SIZE_LIMIT => {
"the account has reached the maximum number of parked transactions".into()
}
Self::PARKED_FULL => "the mempool is out of space for more parked transactions".into(),
Self(other) => {
format!("invalid error code {other}: should be unreachable (this is a bug)")
}
Expand Down
3 changes: 3 additions & 0 deletions crates/astria-sequencer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ ASTRIA_SEQUENCER_DB_FILEPATH="/tmp/astria_db"
# Only used if the "mint" feature is enabled
ASTRIA_SEQUENCER_ENABLE_MINT=false

# Set size of mempool's parked container
ASTRIA_SEQUENCER_MEMPOOL_PARKED_MAX_TX_COUNT=200

# Socket address for gRPC server
ASTRIA_SEQUENCER_GRPC_ADDR="127.0.0.1:8080"
# Log level for the sequencer
Expand Down
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub(crate) async fn initialize_app_with_storage(
.expect("failed to create temp storage backing chain state");
let snapshot = storage.latest_snapshot();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let mut app = App::new(snapshot, mempool, metrics).await.unwrap();

let genesis_state = genesis_state.unwrap_or_else(self::genesis_state);
Expand Down
2 changes: 2 additions & 0 deletions crates/astria-sequencer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Config {
pub metrics_http_listener_addr: String,
/// Writes a human readable format to stdout instead of JSON formatted OTEL trace data.
pub pretty_print: bool,
/// The maximum number of transactions that can be parked in the mempool.
pub mempool_parked_max_tx_count: usize,
}

impl config::Config for Config {
Expand Down
6 changes: 3 additions & 3 deletions crates/astria-sequencer/src/grpc/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ mod tests {
let block = make_test_sequencer_block(1);
let storage = cnidarium::TempStorage::new().await.unwrap();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let mut state_tx = StateDelta::new(storage.latest_snapshot());
state_tx.put_block_height(1).unwrap();
state_tx.put_sequencer_block(block).unwrap();
Expand All @@ -277,7 +277,7 @@ mod tests {
async fn get_pending_nonce_in_mempool() {
let storage = cnidarium::TempStorage::new().await.unwrap();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);

let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
Expand Down Expand Up @@ -328,7 +328,7 @@ mod tests {

let storage = cnidarium::TempStorage::new().await.unwrap();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let mut state_tx = StateDelta::new(storage.latest_snapshot());
let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
Expand Down
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/mempool/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn init_mempool<T: MempoolSize>() -> Mempool {
.build()
.unwrap();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, T::size());
let account_mock_balance = mock_balances(0, 0);
let tx_mock_cost = mock_tx_cost(0, 0, 0);
runtime.block_on(async {
Expand Down
69 changes: 52 additions & 17 deletions crates/astria-sequencer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use transactions_container::{
ParkedTransactions,
PendingTransactions,
TimemarkedTransaction,
TransactionsContainer as _,
};

use crate::{
Expand Down Expand Up @@ -170,10 +171,13 @@ pub(crate) struct Mempool {

impl Mempool {
#[must_use]
pub(crate) fn new(metrics: &'static Metrics) -> Self {
pub(crate) fn new(metrics: &'static Metrics, parked_max_tx_count: usize) -> Self {
Self {
pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))),
parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))),
parked: Arc::new(RwLock::new(ParkedTransactions::new(
TX_TTL,
parked_max_tx_count,
))),
comet_bft_removal_cache: Arc::new(RwLock::new(RemovalCache::new(
NonZeroUsize::try_from(REMOVAL_CACHE_SIZE)
.expect("Removal cache cannot be zero sized"),
Expand Down Expand Up @@ -227,6 +231,10 @@ impl Mempool {
&current_account_balances,
) {
Ok(()) => {
// log current size of parked
self.metrics
.set_transactions_in_mempool_parked(parked.len());

// track in contained txs
self.lock_contained_txs().await.add(id);
Ok(())
Expand All @@ -238,7 +246,8 @@ impl Mempool {
InsertionError::AlreadyPresent
| InsertionError::NonceTooLow
| InsertionError::NonceTaken
| InsertionError::AccountSizeLimit,
| InsertionError::AccountSizeLimit
| InsertionError::ParkedSizeLimit,
) => error,
Ok(()) => {
// check parked for txs able to be promoted
Expand Down Expand Up @@ -441,8 +450,8 @@ impl Mempool {
let tx_id = tx.id();
if let Err(error) = parked.add(tx, current_nonce, &current_balances) {
// NOTE: this shouldn't happen normally but could on the edge case of
// the parked queue being full for the account. This also means
// grabbing the lock inside the loop is more performant.
// the parked queue being full for the account or globally.
// Grabbing the lock inside the loop should be more performant.
self.lock_contained_txs().await.remove(tx_id);
self.metrics.increment_internal_logic_error();
error!(
Expand Down Expand Up @@ -514,7 +523,7 @@ mod tests {
#[tokio::test]
async fn insert() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

Expand Down Expand Up @@ -550,7 +559,7 @@ mod tests {
tx1_replacement.clone(),
0,
account_balances.clone(),
tx_cost.clone()
tx_cost.clone(),
)
.await
.unwrap_err(),
Expand Down Expand Up @@ -578,7 +587,7 @@ mod tests {
// odder edge cases that can be hit if a node goes offline or fails to see
// some transactions that other nodes include into their proposed blocks.
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

Expand Down Expand Up @@ -680,7 +689,7 @@ mod tests {
#[tokio::test]
async fn run_maintenance_promotion() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);

// create transaction setup to trigger promotions
//
Expand Down Expand Up @@ -753,7 +762,7 @@ mod tests {
#[tokio::test]
async fn run_maintenance_demotion() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);

// create transaction setup to trigger demotions
//
Expand Down Expand Up @@ -848,7 +857,7 @@ mod tests {
#[tokio::test]
async fn remove_invalid() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 10);

Expand Down Expand Up @@ -951,7 +960,7 @@ mod tests {
#[tokio::test]
async fn should_get_pending_nonce() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);

let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);
Expand Down Expand Up @@ -985,7 +994,7 @@ mod tests {
tx100.clone(),
100,
account_balances.clone(),
tx_cost.clone()
tx_cost.clone(),
)
.await
.is_ok(),
Expand All @@ -1001,7 +1010,7 @@ mod tests {
tx101.clone(),
100,
account_balances.clone(),
tx_cost.clone()
tx_cost.clone(),
)
.await
.is_ok(),
Expand Down Expand Up @@ -1094,7 +1103,7 @@ mod tests {
#[tokio::test]
async fn tx_tracked_invalid_removal_removes_all() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

Expand Down Expand Up @@ -1128,7 +1137,7 @@ mod tests {
#[tokio::test]
async fn tx_tracked_maintenance_removes_all() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

Expand Down Expand Up @@ -1161,7 +1170,7 @@ mod tests {
#[tokio::test]
async fn tx_tracked_reinsertion_ok() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mempool = Mempool::new(metrics, 100);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

Expand Down Expand Up @@ -1200,4 +1209,30 @@ mod tests {
assert!(mempool.is_tracked(tx0.id().get()).await);
assert!(mempool.is_tracked(tx1.id().get()).await);
}

#[tokio::test]
async fn parked_limit_enforced() {
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics, 1);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

let tx0 = MockTxBuilder::new().nonce(1).build();
let tx1 = MockTxBuilder::new().nonce(2).build();

mempool
.insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();

// size limit fails as expected
assert_eq!(
mempool
.insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap_err(),
InsertionError::ParkedSizeLimit,
"size limit should be enforced"
);
}
}
Loading