Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/op-rbuilder/src/builders/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
&self,
info: &mut ExecutionInfo<E>,
state: &mut State<DB>,
mut best_txs: impl PayloadTxsBounds,
best_txs: &mut impl PayloadTxsBounds,
block_gas_limit: u64,
block_da_limit: Option<u64>,
) -> Result<Option<()>, PayloadBuilderError>
Expand Down
228 changes: 164 additions & 64 deletions crates/op-rbuilder/src/builders/flashblocks/best_txs.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
use std::{
collections::{BTreeMap, VecDeque},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use alloy_primitives::Address;
use alloy_primitives::{Address, TxHash};
use reth_payload_util::PayloadTransactions;
use reth_transaction_pool::PoolTransaction;
use std::collections::HashSet;
use tracing::debug;

use crate::tx::MaybeFlashblockFilter;
Expand All @@ -19,22 +12,36 @@ where
I: PayloadTransactions<Transaction = T>,
{
inner: I,
current_flashblock_number: Arc<AtomicU64>,
early_transactions: BTreeMap<u64, VecDeque<T>>,
current_flashblock_number: u64,
// Transactions that were already commited to the state. Using them again would cause NonceTooLow
// so we skip them
commited_transactions: HashSet<TxHash>,
}

impl<T, I> BestFlashblocksTxs<T, I>
where
T: PoolTransaction,
I: PayloadTransactions<Transaction = T>,
{
pub fn new(inner: I, current_flashblock_number: Arc<AtomicU64>) -> Self {
pub fn new(inner: I) -> Self {
Self {
inner,
current_flashblock_number,
early_transactions: Default::default(),
current_flashblock_number: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you setting it to 0 because you're only calling new for the first flashblock or is it just a placeholder? If it's a placeholder I think it would be cleaner to have this field be optional

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

placeholder + when we initiate iterator we expect this to be 0

commited_transactions: Default::default(),
}
}

/// Replaces current iterator with new one. We use it on new flashblock building, to refresh
/// priority boundaries
pub fn refresh_iterator(&mut self, inner: I, current_flashblock_number: u64) {
self.inner = inner;
self.current_flashblock_number = current_flashblock_number;
}

/// Remove transaction from next iteration and it already in the state
pub fn mark_commited(&mut self, txs: Vec<TxHash>) {
self.commited_transactions.extend(txs);
}
}

impl<T, I> PayloadTransactions for BestFlashblocksTxs<T, I>
Expand All @@ -46,66 +53,30 @@ where

fn next(&mut self, ctx: ()) -> Option<Self::Transaction> {
loop {
let flashblock_number = self.current_flashblock_number.load(Ordering::Relaxed);

// Check for new transactions that can be executed with the higher flashblock number
while let Some((&min_flashblock, _)) = self.early_transactions.first_key_value() {
if min_flashblock > flashblock_number {
break;
}

if let Some(mut txs) = self.early_transactions.remove(&min_flashblock) {
while let Some(tx) = txs.pop_front() {
// Re-check max flashblock number just in case
if let Some(max) = tx.flashblock_number_max() {
if flashblock_number > max {
debug!(
target: "payload_builder",
sender = ?tx.sender(),
nonce = tx.nonce(),
current_flashblock = flashblock_number,
max_flashblock = max,
"Bundle flashblock max exceeded"
);
self.mark_invalid(tx.sender(), tx.nonce());
continue;
}
}

// The vecdeque isn't modified in place so we need to replace it
if !txs.is_empty() {
self.early_transactions.insert(min_flashblock, txs);
}

return Some(tx);
}
}
}

let tx = self.inner.next(ctx)?;
// Skip transaction we already included
if self.commited_transactions.contains(tx.hash()) {
continue;
}

let flashblock_number_min = tx.flashblock_number_min();
let flashblock_number_max = tx.flashblock_number_max();

// Check min flashblock requirement
if let Some(min) = flashblock_number_min {
if flashblock_number < min {
self.early_transactions
.entry(min)
.or_default()
.push_back(tx);
if self.current_flashblock_number < min {
continue;
}
}

// Check max flashblock requirement
if let Some(max) = flashblock_number_max {
if flashblock_number > max {
if self.current_flashblock_number > max {
debug!(
target: "payload_builder",
sender = ?tx.sender(),
nonce = tx.nonce(),
current_flashblock = flashblock_number,
current_flashblock = self.current_flashblock_number,
max_flashblock = max,
"Bundle flashblock max exceeded"
);
Expand All @@ -118,15 +89,144 @@ where
}
}

/// Proxy to inner iterator
fn mark_invalid(&mut self, sender: Address, nonce: u64) {
self.inner.mark_invalid(sender, nonce);
}
}

#[cfg(test)]
mod tests {
use crate::{
builders::flashblocks::best_txs::BestFlashblocksTxs,
mock_tx::{MockFbTransaction, MockFbTransactionFactory},
};
use alloy_consensus::Transaction;
use reth_payload_util::{BestPayloadTransactions, PayloadTransactions};
use reth_transaction_pool::{pool::PendingPool, CoinbaseTipOrdering, PoolTransaction};
use std::sync::Arc;

#[test]
fn test_simple_case() {
let mut pool = PendingPool::new(CoinbaseTipOrdering::<MockFbTransaction>::default());
let mut f = MockFbTransactionFactory::default();

// Add 3 regular transaction
let tx_1 = f.create_eip1559();
let tx_2 = f.create_eip1559();
let tx_3 = f.create_eip1559();
pool.add_transaction(Arc::new(tx_1), 0);
pool.add_transaction(Arc::new(tx_2), 0);
pool.add_transaction(Arc::new(tx_3), 0);

// Create iterator
let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best()));
// ### First flashblock
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
// Accept first tx
let tx1 = iterator.next(()).unwrap();
// Invalidate second tx
let tx2 = iterator.next(()).unwrap();
iterator.mark_invalid(tx2.sender(), tx2.nonce());
// Accept third tx
let tx3 = iterator.next(()).unwrap();
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
// Mark transaction as commited
iterator.mark_commited(vec![*tx1.hash(), *tx3.hash()]);

// ### Second flashblock
// It should not return txs 1 and 3, but should return 2
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
let tx2 = iterator.next(()).unwrap();
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
// Mark transaction as commited
iterator.mark_commited(vec![*tx2.hash()]);

// ### Third flashblock
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
}

// Clear early_transactions from this sender with a greater nonce as
// these transactions now will not execute because there would be a
// nonce gap
self.early_transactions.retain(|_, txs| {
txs.retain(|tx| !(tx.sender() == sender && tx.nonce() > nonce));
!txs.is_empty()
});
/// Test bundle cases
/// We won't mark transactions as commited to test that boundaries are respected
#[test]
fn test_bundle_case() {
let mut pool = PendingPool::new(CoinbaseTipOrdering::<MockFbTransaction>::default());
let mut f = MockFbTransactionFactory::default();

// Add 4 fb transaction
let tx_1 = f.create_legacy_fb(None, None);
let tx_1_hash = *tx_1.hash();
let tx_2 = f.create_legacy_fb(None, Some(1));
let tx_2_hash = *tx_2.hash();
let tx_3 = f.create_legacy_fb(Some(1), None);
let tx_3_hash = *tx_3.hash();
let tx_4 = f.create_legacy_fb(Some(2), Some(3));
let tx_4_hash = *tx_4.hash();
pool.add_transaction(Arc::new(tx_1), 0);
pool.add_transaction(Arc::new(tx_2), 0);
pool.add_transaction(Arc::new(tx_3), 0);
pool.add_transaction(Arc::new(tx_4), 0);

// Create iterator
let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best()));
// ### First flashblock
// should contain txs 1 and 2
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx2 = iterator.next(()).unwrap();
assert_eq!(tx2.hash(), &tx_2_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");

// ### Second flashblock
// should contain txs 1, 2, and 3
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx2 = iterator.next(()).unwrap();
assert_eq!(tx2.hash(), &tx_2_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");

// ### Third flashblock
// should contain txs 1, 3, and 4
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
let tx4 = iterator.next(()).unwrap();
assert_eq!(tx4.hash(), &tx_4_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");

// ### Forth flashblock
// should contain txs 1, 3, and 4
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 3);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
let tx4 = iterator.next(()).unwrap();
assert_eq!(tx4.hash(), &tx_4_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");

// ### Fifth flashblock
// should contain txs 1 and 3
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 4);
let tx1 = iterator.next(()).unwrap();
assert_eq!(tx1.hash(), &tx_1_hash);
let tx3 = iterator.next(()).unwrap();
assert_eq!(tx3.hash(), &tx_3_hash);
// Check that it's empty
assert!(iterator.next(()).is_none(), "Iterator should be empty");
}
}
37 changes: 23 additions & 14 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ use rollup_boost::{
use serde::{Deserialize, Serialize};
use std::{
ops::{Div, Rem},
sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
},
sync::{Arc, OnceLock},
time::Instant,
};
use tokio::sync::{
Expand All @@ -65,15 +62,15 @@ struct ExtraExecutionInfo {
#[derive(Debug, Default)]
struct FlashblocksExtraCtx {
/// Current flashblock index
pub flashblock_index: Arc<AtomicU64>,
pub flashblock_index: u64,
/// Target flashblock count
pub target_flashblock_count: u64,
}

impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {
/// Returns the current flashblock index
pub fn flashblock_index(&self) -> u64 {
self.extra_ctx.flashblock_index.load(Ordering::Relaxed)
self.extra_ctx.flashblock_index
}

/// Returns the target flashblock count
Expand All @@ -83,10 +80,8 @@ impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {

/// Increments the flashblock index
pub fn increment_flashblock_index(&mut self) -> u64 {
self.extra_ctx
.flashblock_index
.fetch_add(1, Ordering::Relaxed);
self.flashblock_index()
self.extra_ctx.flashblock_index += 1;
self.extra_ctx.flashblock_index
}

/// Sets the target flashblock count
Expand Down Expand Up @@ -261,7 +256,7 @@ where
builder_signer: self.config.builder_signer,
metrics: Default::default(),
extra_ctx: FlashblocksExtraCtx {
flashblock_index: Arc::new(AtomicU64::new(0)),
flashblock_index: 0,
target_flashblock_count: self.config.flashblocks_per_block(),
},
max_gas_per_txn: self.config.max_gas_per_txn,
Expand Down Expand Up @@ -378,6 +373,11 @@ where
*da_limit = da_limit.saturating_sub(builder_tx_da_size);
}

// Create best_transaction iterator
let mut best_txs = BestFlashblocksTxs::new(BestPayloadTransactions::new(
self.pool
.best_transactions_with_attributes(ctx.best_transaction_attributes()),
));
// This channel coordinates flashblock building
let (fb_cancel_token_rx, mut fb_cancel_token_tx) =
mpsc::channel((self.config.flashblocks_per_block() + 1) as usize);
Expand Down Expand Up @@ -443,13 +443,13 @@ where
}

let best_txs_start_time = Instant::now();
let best_txs = BestFlashblocksTxs::new(
best_txs.refresh_iterator(
BestPayloadTransactions::new(
self.pool.best_transactions_with_attributes(
ctx.best_transaction_attributes(),
),
),
ctx.extra_ctx.flashblock_index.clone(),
ctx.flashblock_index(),
);
let transaction_pool_fetch_time = best_txs_start_time.elapsed();
ctx.metrics
Expand All @@ -463,10 +463,19 @@ where
ctx.execute_best_transactions(
&mut info,
&mut state,
best_txs,
&mut best_txs,
total_gas_per_batch.min(ctx.block_gas_limit()),
total_da_per_batch,
)?;
// Extract last transactions
let new_transactions = info.executed_transactions
[info.extra.last_flashblock_index..]
.to_vec()
.iter()
.map(|tx| tx.tx_hash())
.collect::<Vec<_>>();
best_txs.mark_commited(new_transactions);

// We got block cancelled, we won't need anything from the block at this point
// Caution: this assume that block cancel token only cancelled when new FCU is received
if block_cancel.is_cancelled() {
Expand Down
Loading
Loading