Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ priority-queue = "2.0.2"
tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" }
cnidarium = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0", features = [
"metrics",
] }
cnidarium-component = { git = "https://github.com/penumbra-zone/penumbra.git", tag = "v0.78.0" }

async-trait = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ impl App {
self.mempool.insert_all(txs_to_readd_to_mempool).await;
let mempool_len = self.mempool.len().await;
debug!(mempool_len, "finished executing transactions from mempool");
self.metrics.set_transactions_in_mempool_total(mempool_len);

self.execution_results = Some(execution_results);
Ok((validated_txs, included_signed_txs))
Expand Down
154 changes: 154 additions & 0 deletions crates/astria-sequencer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use metrics::{
counter,
describe_counter,
Expand All @@ -12,6 +14,8 @@ use metrics::{
};
use telemetry::metric_names;

const CHECK_TX_STAGE: &str = "stage";

pub(crate) struct Metrics {
prepare_proposal_excluded_transactions_cometbft_space: Counter,
prepare_proposal_excluded_transactions_sequencer_space: Counter,
Expand All @@ -26,6 +30,16 @@ pub(crate) struct Metrics {
check_tx_removed_failed_stateless: Counter,
check_tx_removed_stale_nonce: Counter,
check_tx_removed_account_balance: Counter,
check_tx_duration_seconds_parse_tx: Histogram,
check_tx_duration_seconds_check_stateless: Histogram,
check_tx_duration_seconds_check_nonce: Histogram,
check_tx_duration_seconds_check_chain_id: Histogram,
check_tx_duration_seconds_check_balance: Histogram,
check_tx_duration_seconds_check_removed: Histogram,
check_tx_duration_seconds_insert_to_app_mempool: Histogram,
actions_per_transaction_in_mempool: Histogram,
transaction_in_mempool_size_bytes: Histogram,
transactions_in_mempool_total: Gauge,
}

impl Metrics {
Expand Down Expand Up @@ -135,6 +149,62 @@ impl Metrics {
);
let check_tx_removed_expired = counter!(CHECK_TX_REMOVED_EXPIRED);

describe_histogram!(
CHECK_TX_DURATION_SECONDS,
Unit::Seconds,
"The amount of time taken in seconds to successfully complete the various stages of \
check_tx"
);
let check_tx_duration_seconds_parse_tx = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "length check and parse raw tx"
);
let check_tx_duration_seconds_check_stateless = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "stateless check"
);
let check_tx_duration_seconds_check_nonce = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "nonce check"
);
let check_tx_duration_seconds_check_chain_id = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "chain id check"
);
let check_tx_duration_seconds_check_balance = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "balance check"
);
let check_tx_duration_seconds_check_removed = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "check for removal"
);
let check_tx_duration_seconds_insert_to_app_mempool = histogram!(
CHECK_TX_DURATION_SECONDS,
CHECK_TX_STAGE => "insert to app mempool"
);

describe_histogram!(
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
Unit::Count,
"The number of actions in a transaction added to the app mempool"
);
let actions_per_transaction_in_mempool = histogram!(ACTIONS_PER_TRANSACTION_IN_MEMPOOL);

describe_histogram!(
TRANSACTION_IN_MEMPOOL_SIZE_BYTES,
Unit::Bytes,
"The number of bytes in a transaction added to the app mempool"
);
let transaction_in_mempool_size_bytes = histogram!(TRANSACTION_IN_MEMPOOL_SIZE_BYTES);

describe_gauge!(
TRANSACTIONS_IN_MEMPOOL_TOTAL,
Unit::Count,
"The number of transactions in the app mempool"
);
let transactions_in_mempool_total = gauge!(TRANSACTIONS_IN_MEMPOOL_TOTAL);

Self {
prepare_proposal_excluded_transactions_cometbft_space,
prepare_proposal_excluded_transactions_sequencer_space,
Expand All @@ -149,6 +219,16 @@ impl Metrics {
check_tx_removed_failed_stateless,
check_tx_removed_stale_nonce,
check_tx_removed_account_balance,
check_tx_duration_seconds_parse_tx,
check_tx_duration_seconds_check_stateless,
check_tx_duration_seconds_check_nonce,
check_tx_duration_seconds_check_chain_id,
check_tx_duration_seconds_check_balance,
check_tx_duration_seconds_check_removed,
check_tx_duration_seconds_insert_to_app_mempool,
actions_per_transaction_in_mempool,
transaction_in_mempool_size_bytes,
transactions_in_mempool_total,
}
}

Expand Down Expand Up @@ -212,6 +292,59 @@ impl Metrics {
pub(crate) fn increment_check_tx_removed_account_balance(&self) {
self.check_tx_removed_account_balance.increment(1);
}

pub(crate) fn record_check_tx_duration_seconds_parse_tx(&self, duration: Duration) {
self.check_tx_duration_seconds_parse_tx.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_stateless(&self, duration: Duration) {
self.check_tx_duration_seconds_check_stateless
.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_nonce(&self, duration: Duration) {
self.check_tx_duration_seconds_check_nonce.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_chain_id(&self, duration: Duration) {
self.check_tx_duration_seconds_check_chain_id
.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_balance(&self, duration: Duration) {
self.check_tx_duration_seconds_check_balance
.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_removed(&self, duration: Duration) {
self.check_tx_duration_seconds_check_removed
.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_insert_to_app_mempool(
&self,
duration: Duration,
) {
self.check_tx_duration_seconds_insert_to_app_mempool
.record(duration);
}

pub(crate) fn record_actions_per_transaction_in_mempool(&self, count: usize) {
// allow: precision loss is unlikely (values too small) but also unimportant in histograms.
#[allow(clippy::cast_precision_loss)]
self.actions_per_transaction_in_mempool.record(count as f64);
}

pub(crate) fn record_transaction_in_mempool_size_bytes(&self, count: usize) {
// allow: precision loss is unlikely (values too small) but also unimportant in histograms.
#[allow(clippy::cast_precision_loss)]
self.transaction_in_mempool_size_bytes.record(count as f64);
}

Comment on lines +332 to +343
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are set every time a tx is inserted (and is per-tx) right? would it be more/less useful to also have a mempool total?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about that too. Thing is, it's relatively cheap to get the number of txs in the mempool (call to lock the RwLock and then a call to len()), but it could be expensive to track either of these (e.g. lock then iterate all txs).

If we do want this, it's probably best to track this inside the mempool. We'd want the tx size to be passed in via insert to avoid having to convert the signed tx back into PB form to reserialize it.

I'll merge this PR as is, and if anyone thinks we should add these metrics, I can do a follow-up PR.

pub(crate) fn set_transactions_in_mempool_total(&self, count: usize) {
#[allow(clippy::cast_precision_loss)]
self.transactions_in_mempool_total.set(count as f64);
}
}

metric_names!(pub const METRICS_NAMES:
Expand All @@ -228,11 +361,17 @@ metric_names!(pub const METRICS_NAMES:
CHECK_TX_REMOVED_FAILED_STATELESS,
CHECK_TX_REMOVED_STALE_NONCE,
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
CHECK_TX_DURATION_SECONDS,
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
TRANSACTION_IN_MEMPOOL_SIZE_BYTES,
TRANSACTIONS_IN_MEMPOOL_TOTAL
);

#[cfg(test)]
mod tests {
use super::{
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
CHECK_TX_DURATION_SECONDS,
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
CHECK_TX_REMOVED_EXPIRED,
CHECK_TX_REMOVED_FAILED_EXECUTION,
Expand All @@ -246,6 +385,8 @@ mod tests {
PROCESS_PROPOSAL_SKIPPED_PROPOSAL,
PROPOSAL_DEPOSITS,
PROPOSAL_TRANSACTIONS,
TRANSACTIONS_IN_MEMPOOL_TOTAL,
TRANSACTION_IN_MEMPOOL_SIZE_BYTES,
};

#[track_caller]
Expand Down Expand Up @@ -295,5 +436,18 @@ mod tests {
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
"check_tx_removed_account_balance",
);
assert_const(CHECK_TX_DURATION_SECONDS, "check_tx_duration_seconds");
assert_const(
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
"actions_per_transaction_in_mempool",
);
assert_const(
TRANSACTION_IN_MEMPOOL_SIZE_BYTES,
"transaction_in_mempool_size_bytes",
);
assert_const(
TRANSACTIONS_IN_MEMPOOL_TOTAL,
"transactions_in_mempool_total",
);
}
}
3 changes: 3 additions & 0 deletions crates/astria-sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ impl Sequencer {
pub async fn run_until_stopped(config: Config) -> Result<()> {
static METRICS: OnceLock<Metrics> = OnceLock::new();
let metrics = METRICS.get_or_init(Metrics::new);
cnidarium::register_metrics();
metrics::histogram!("cnidarium_get_raw_duration_seconds");
metrics::histogram!("cnidarium_nonverifiable_get_raw_duration_seconds");

if config
.db_filepath
Expand Down
49 changes: 47 additions & 2 deletions crates/astria-sequencer/src/service/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
Context,
Poll,
},
time::Instant,
};

use astria_core::{
Expand Down Expand Up @@ -105,12 +106,16 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
) -> response::CheckTx {
use sha2::Digest as _;

let tx_hash = sha2::Sha256::digest(&req.tx).into();
let start_parsing = Instant::now();

let request::CheckTx {
tx, ..
} = req;
if tx.len() > MAX_TX_SIZE {

let tx_hash = sha2::Sha256::digest(&tx).into();
let tx_len = tx.len();

if tx_len > MAX_TX_SIZE {
mempool.remove(tx_hash).await;
metrics.increment_check_tx_removed_too_large();
return response::CheckTx {
Expand Down Expand Up @@ -151,6 +156,11 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
}
};

let finished_parsing = Instant::now();
metrics.record_check_tx_duration_seconds_parse_tx(
finished_parsing.saturating_duration_since(start_parsing),
);

if let Err(e) = transaction::check_stateless(&signed_tx).await {
mempool.remove(tx_hash).await;
metrics.increment_check_tx_removed_failed_stateless();
Expand All @@ -162,6 +172,11 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
};
};

let finished_check_stateless = Instant::now();
metrics.record_check_tx_duration_seconds_check_stateless(
finished_check_stateless.saturating_duration_since(finished_parsing),
);

if let Err(e) = transaction::check_nonce_mempool(&signed_tx, &state).await {
mempool.remove(tx_hash).await;
metrics.increment_check_tx_removed_stale_nonce();
Expand All @@ -173,6 +188,11 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
};
};

let finished_check_nonce = Instant::now();
metrics.record_check_tx_duration_seconds_check_nonce(
finished_check_nonce.saturating_duration_since(finished_check_stateless),
);

if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, &state).await {
mempool.remove(tx_hash).await;
return response::CheckTx {
Expand All @@ -183,6 +203,11 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
};
}

let finished_check_chain_id = Instant::now();
metrics.record_check_tx_duration_seconds_check_chain_id(
finished_check_chain_id.saturating_duration_since(finished_check_nonce),
);

if let Err(e) = transaction::check_balance_mempool(&signed_tx, &state).await {
mempool.remove(tx_hash).await;
metrics.increment_check_tx_removed_account_balance();
Expand All @@ -194,6 +219,11 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
};
};

let finished_check_balance = Instant::now();
metrics.record_check_tx_duration_seconds_check_balance(
finished_check_balance.saturating_duration_since(finished_check_chain_id),
);

if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await {
mempool.remove(tx_hash).await;

Expand All @@ -219,6 +249,11 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
}
};

let finished_check_removed = Instant::now();
metrics.record_check_tx_duration_seconds_check_removed(
finished_check_removed.saturating_duration_since(finished_check_balance),
);

// tx is valid, push to mempool
let current_account_nonce = state
.get_account_nonce(crate::address::base_prefixed(
Expand All @@ -227,12 +262,22 @@ async fn handle_check_tx<S: StateReadExt + 'static>(
.await
.expect("can fetch account nonce");

let actions_count = signed_tx.actions().len();

mempool
.insert(signed_tx, current_account_nonce)
.await
.expect(
"tx nonce is greater than or equal to current account nonce; this was checked in \
check_nonce_mempool",
);
let mempool_len = mempool.len().await;

metrics
.record_check_tx_duration_seconds_insert_to_app_mempool(finished_check_removed.elapsed());
metrics.record_actions_per_transaction_in_mempool(actions_count);
metrics.record_transaction_in_mempool_size_bytes(tx_len);
metrics.set_transactions_in_mempool_total(mempool_len);

response::CheckTx::default()
}