Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub use mem::InMemoryBlobStore;
pub use noop::NoopBlobStore;
use reth_primitives::{BlobTransactionSidecar, H256};
use std::fmt;
pub use tracker::BlobStoreCanonTracker;
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};

mod mem;
mod noop;
Expand Down
11 changes: 5 additions & 6 deletions crates/transaction-pool/src/blobstore/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct BlobStoreCanonTracker {

impl BlobStoreCanonTracker {
/// Adds a block to the blob store maintenance.
pub(crate) fn add_block(
pub fn add_block(
&mut self,
block_number: BlockNumber,
blob_txs: impl IntoIterator<Item = H256>,
Expand All @@ -22,7 +22,7 @@ impl BlobStoreCanonTracker {
}

/// Adds all blocks to the tracked list of blocks.
pub(crate) fn add_blocks(
pub fn add_blocks(
&mut self,
blocks: impl IntoIterator<Item = (BlockNumber, impl IntoIterator<Item = H256>)>,
) {
Expand All @@ -32,7 +32,7 @@ impl BlobStoreCanonTracker {
}

/// Adds all blob transactions from the given chain to the tracker.
pub(crate) fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
pub fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
let blob_txs = blocks.iter().map(|(num, blocks)| {
let iter =
blocks.body.iter().filter(|tx| tx.transaction.is_eip4844()).map(|tx| tx.hash);
Expand All @@ -42,8 +42,7 @@ impl BlobStoreCanonTracker {
}

/// Invoked when a block is finalized.
#[allow(unused)]
pub(crate) fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
pub fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
let mut finalized = Vec::new();
while let Some(entry) = self.blob_txs_in_blocks.first_entry() {
if *entry.key() <= number {
Expand All @@ -63,7 +62,7 @@ impl BlobStoreCanonTracker {

/// Updates that should be applied to the blob store.
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum BlobStoreUpdates {
pub enum BlobStoreUpdates {
/// No updates.
None,
/// Delete the given finalized transactions from the blob store.
Expand Down
52 changes: 50 additions & 2 deletions crates/transaction-pool/src/maintain.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Support for maintaining the state of the transaction pool

use crate::{
blobstore::BlobStoreCanonTracker,
blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
metrics::MaintainPoolMetrics,
traits::{CanonicalStateUpdate, ChangedAccount, TransactionPoolExt},
BlockInfo, TransactionPool,
Expand All @@ -10,7 +10,9 @@ use futures_util::{
future::{BoxFuture, Fuse, FusedFuture},
FutureExt, Stream, StreamExt,
};
use reth_primitives::{Address, BlockHash, BlockNumberOrTag, FromRecoveredTransaction};
use reth_primitives::{
Address, BlockHash, BlockNumber, BlockNumberOrTag, FromRecoveredTransaction,
};
use reth_provider::{
BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, PostState, StateProviderFactory,
};
Expand Down Expand Up @@ -97,6 +99,10 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
// keeps track of mined blob transaction so we can clean finalized transactions
let mut blob_store_tracker = BlobStoreCanonTracker::default();

// keeps track of the latest finalized block
let mut last_finalized_block =
FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());

// keeps track of any dirty accounts that we know of are out of sync with the pool
let mut dirty_addresses = HashSet::new();

Expand Down Expand Up @@ -154,6 +160,19 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
task_spawner.spawn_blocking(fut);
}

// check if we have a new finalized block
if let Some(finalized) =
last_finalized_block.update(client.finalized_block_number().ok().flatten())
{
match blob_store_tracker.on_finalized_block(finalized) {
BlobStoreUpdates::None => {}
BlobStoreUpdates::Finalized(blobs) => {
// remove all finalized blobs from the blob store
pool.delete_blobs(blobs);
}
}
}

// outcomes of the futures we are waiting on
let mut event = None;
let mut reloaded = None;
Expand Down Expand Up @@ -360,6 +379,35 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
}
}

struct FinalizedBlockTracker {
last_finalized_block: Option<BlockNumber>,
}

impl FinalizedBlockTracker {
fn new(last_finalized_block: Option<BlockNumber>) -> Self {
Self { last_finalized_block }
}

/// Updates the tracked finalized block and returns the new finalized block if it changed
fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
match (self.last_finalized_block, finalized_block) {
(Some(last), Some(finalized)) => {
self.last_finalized_block = Some(finalized);
if last < finalized {
Some(finalized)
} else {
None
}
}
(None, Some(finalized)) => {
self.last_finalized_block = Some(finalized);
Some(finalized)
}
_ => None,
}
}
}

/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
/// state.
#[derive(Debug, Eq, PartialEq)]
Expand Down