feat(sequencer): add ttl and invalid cache to app mempool#1138
feat(sequencer): add ttl and invalid cache to app mempool#1138lobstergrindset merged 1 commit intomainfrom
Conversation
0aa41f2 to
be74562
Compare
| ttl: Time::now() | ||
| .unix_timestamp() | ||
| .checked_add(ttl) | ||
| .expect("overflow in enqueued transaction ttl"), |
There was a problem hiding this comment.
i'd prefer having a timeout_time passed in, which is a unix timestamp of when the tx expires, over a ttl. it'll be easier to test and compute client-side when the exact timeout is going to be before submitting
There was a problem hiding this comment.
would the time be decided by the transaction signer? I'm a little hesitant to let users add an expiration if they aren't signing over the expiration. future block builders wouldn't have to respect non-signed data if they didn't want to
There was a problem hiding this comment.
I can see the advantage of the improved UX of allowing the client to choose the expiry time, but OTOH I agree that if we support that, it should also be signed over... and that could be worse UX since resubmitting would require re-signing by all parties (I know we only have one signatory right now, but I guess that could change?)
We'd probably also need to reject txs which had expiry times too far in the future, since they'd effectively never expire.
There was a problem hiding this comment.
okay right, since this doesn't actually add it to the tx type, this is fine. in the future can add it to the actual tx!
| txs_to_readd_to_mempool.push((enqueued_tx, priority)); | ||
| } else { | ||
| // the transaction should be removed from the mempool | ||
| self.mempool.track_invalid(enqueued_tx.tx_hash()).await; |
There was a problem hiding this comment.
Would there be any benefit to including the error (e.g. as format!("{e:#}")) in the InvalidCache along with the hash so it can be provided in the CheckTx response's log?
| }, | ||
| }; | ||
| use priority_queue::PriorityQueue; | ||
| use tendermint::Time; |
There was a problem hiding this comment.
I strongly think we should use tokio::time::{Instant, Duration} instead of this and replace our usage of bare integers with Instant and Duration accordingly.
They're aliases for the std::time equivalents, and with that we get the benefit of a guaranteed monotonic clock. (I'm not sure if tendermint::Time guarantees that or not). But we also get type safety (we can't accidentally mix up an instant with a duration) and in tests we can pause and advance time, meaning we can avoid sleeping in tests.
| ttl: Time::now() | ||
| .unix_timestamp() | ||
| .checked_add(ttl) | ||
| .expect("overflow in enqueued transaction ttl"), |
There was a problem hiding this comment.
I can see the advantage of the improved UX of allowing the client to choose the expiry time, but OTOH I agree that if we support that, it should also be signed over... and that could be worse UX since resubmitting would require re-signing by all parties (I know we only have one signatory right now, but I guess that could change?)
We'd probably also need to reject txs which had expiry times too far in the future, since they'd effectively never expire.
| remove_queue: VecDeque<[u8; 32]>, | ||
| max_size: usize, | ||
| time_to_live: i64, | ||
| size: usize, |
There was a problem hiding this comment.
is this the current size of the cache? could you add a comment for this?
| <= self.time_added[&tx_hash] | ||
| .checked_add(self.time_to_live) | ||
| .expect("overflowed ttl add")) | ||
| } |
There was a problem hiding this comment.
it might be nicer to store expiry_time instead of time_added when a tx is inserted, that way you don't need to do the add + expect here each time
There was a problem hiding this comment.
or maybe have separate functions for is_cached and is_expired?
| self.cache.remove(&removed_tx); | ||
| self.time_added.remove(&removed_tx); | ||
| } else { | ||
| self.size = self.size.checked_add(1).expect("cache size overflowed"); |
There was a problem hiding this comment.
do we need a self.size variable over just checking self.remove_queue.len() or len of the hashmap?
| } | ||
| } | ||
|
|
||
| fn cached(&self, tx_hash: [u8; 32]) -> bool { |
There was a problem hiding this comment.
this funtion returns true if the tx is invalidated but not expired right? could you add a comment for that or maybe change the function name to cached_and_not_expired for clarity?
There was a problem hiding this comment.
is this the right behaviour? based on the logic for adding txs to the invalid cache and when to remove them from cometbft, should this return true if the tx is expired?
There was a problem hiding this comment.
the comment was ambiguous, it was returning true if it was expired in the app's mempool but not expired in the cache. refactored to make it more clear
| queue: Arc<RwLock<MempoolQueue>>, | ||
| set: Arc<RwLock<HashSet<[u8; 32]>>>, | ||
| invalid_cache: Arc<RwLock<InvalidCache>>, | ||
| tx_ttl: i64, |
There was a problem hiding this comment.
do we need to store this if it's already a constant? it appears to always be set to the same value
| self.set.write().await.remove(&tx.tx_hash); | ||
| Some((tx, priority)) | ||
| } else { | ||
| tx |
There was a problem hiding this comment.
| tx | |
| None |
a bit more explicit
|
Changes made since last reviews:
|
crates/astria-sequencer/Cargo.toml
Outdated
| tendermint-proto = { workspace = true } | ||
| tendermint = { workspace = true } | ||
| tokio = { workspace = true, features = ["rt", "tracing"] } | ||
| tokio = { workspace = true, features = ["rt", "tracing", "test-util"] } |
There was a problem hiding this comment.
if the test-util feature is just for tests, move it under dev-dependencies
| } | ||
|
|
||
| const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes | ||
| const REMOVAL_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4096) }; // TODO make configuration variable |
There was a problem hiding this comment.
Fraser recommended this instead I'm guessing for configuration safety purposes, but I can change back to just a usize
| /// `RemovalCache` is a cache used for signaling to `CometBFT` when a | ||
| /// transaction can be removed from the `CometBFT` mempool outside of the | ||
| /// `CheckTx` checks. |
There was a problem hiding this comment.
this is checked in check_tx right? there isn't a way to tell cometbft to remove something from the mempool without cometbft calling check_tx on it
There was a problem hiding this comment.
My b, yeah the comment should read 'other CheckTX checks'. Or, I'll just redo the comment
| /// that failed to execute or to re-add a transaction that has expired. | ||
| #[derive(Clone)] | ||
| pub(crate) struct RemovalCache { | ||
| cache: HashMap<[u8; 32], (Instant, Arc<anyhow::Error>)>, |
There was a problem hiding this comment.
for the error, the only two options right now are expired and failed execution right? i'd prefer to change the Arc<anyhow::Error> to an enum, that would be clearer as to which removal reasons exist
There was a problem hiding this comment.
I was trying to bubble up the reason for failure to the user, but it gets stuck at 'failed execution' or 'failed stateful checks'. I can make it an enum if you don't think that the CheckTx failure case should be more specific than 'failed execution'
| /// the reason why it was cached. None is returned if the transaction | ||
| /// is not cached |
There was a problem hiding this comment.
| /// the reason why it was cached. None is returned if the transaction | |
| /// is not cached | |
| /// the reason why it was cached. None is returned if the transaction | |
| /// is not cached or is not expired |
There was a problem hiding this comment.
I was considering a tx being 'cached' as being both not expired and in the cache, but I can change
There was a problem hiding this comment.
sorry i got confused about the tx removal ttl vs the tx in mempool ttl. this makes sense!
| /// inserts all the given transactions into the mempool | ||
| pub(crate) async fn insert_all(&self, txs: Vec<(EnqueuedTransaction, TransactionPriority)>) { | ||
| self.inner.write().await.extend(txs); | ||
| self.queue.write().await.extend(txs); |
There was a problem hiding this comment.
this isn't the same behaviour as insert as it doesn't preserve the timestamp if it already exists
There was a problem hiding this comment.
You're right. I was just relying on the behavior that we're currently only using this function to re-insert transactions that we just popped. Do you have a preference of me adding a comment that explains that or to do a similar process as in insert()?
There was a problem hiding this comment.
i think doing a similar process as to insert() makes the most sense!
|
Logic changes since last review:
|
| } | ||
|
|
||
| /// inserts or updates the transaction in a timestamp preserving manner | ||
| fn update_or_insert( |
There was a problem hiding this comment.
@Fraser999 is this an okay place to put a function like this? The rust CLI suggested I move it out of the Mempool struct's implementation because it wasn't using the self variable
There was a problem hiding this comment.
Absolutely. Another choice would be to leave it inside the impl Mempool but just don't have self as a parameter. You have to call it like Self::update_or_insert in that case. It can make sense to do that in cases like this where you have more than one struct in scope which could use this function, but it's really only intended to be used by the Mempool struct. But for a private function like this, it's pretty much down to personal preference :)
You could simplify the signature though by just taking queue: &mut PriorityQueue and at callsites passing &mut *self.queue.write().await. Definitely not a big deal though :)
| } | ||
|
|
||
| /// inserts or updates the transaction in a timestamp preserving manner | ||
| fn update_or_insert( |
There was a problem hiding this comment.
Absolutely. Another choice would be to leave it inside the impl Mempool but just don't have self as a parameter. You have to call it like Self::update_or_insert in that case. It can make sense to do that in cases like this where you have more than one struct in scope which could use this function, but it's really only intended to be used by the Mempool struct. But for a private function like this, it's pretty much down to personal preference :)
You could simplify the signature though by just taking queue: &mut PriorityQueue and at callsites passing &mut *self.queue.write().await. Definitely not a big deal though :)
|
Changes since last reviews:
|
055b702 to
b6e3fc7
Compare
* main: chore(bridge-withdrawer): add missing errors and clean up names (#1178) feat(sequencer): add ttl and invalid cache to app mempool (#1138) chore(astria-merkle): add benchmarks (#1179) chore(sequencer-relayer): add timeout to gRPCs to Celestia app (#1191) refactor(core): parse ics20 denoms as ibc or trace prefixed variants (#1181) Mycodecrafting/sequencer seed node (#1188) chore: register all metrics during startup (#1144) feat(charts): option to purge geth mempool (#1182)
Summary
This PR adds to the App's mempool a way to signal to CometBFT when transactions should be removed from the CometBFT mempool.
Background
The CometBFT and App mempools currently get transactions that fail to execute stuck in them. This is because when a transaction fails to execute in
prepare_proposal(), it doesn't get removed from the CometBFT mempool. CometBFT only clears out transactions if they either failhandleCheckTx()or are included in a block. Because of this, CometBFT will re-add the failed transaction to the App mempool during itshandleCheckTx()maintenance, which will cause it to be fed toprepare_proposal()again.We also need a way for full nodes to clear out these invalid transactions. Since these nodes don't run
prepare_proposal(), we need an additional way, like a tx TTL, to signal when transactions can be dropped.Changes
prepare_proposal()get added to the App's removal cache.Testing
Unit tests and local testing of invalid transactions.
Metrics
Added
CHECK_TX_REMOVED_FAILED_EXECUTIONcounter to the sequencer's metrics.Added
CHECK_TX_REMOVED_EXPIREDcounter to the sequencer's metrics.Related
Closes #979