Skip to content

Commit e892c02

Browse files
committed
Improve best tx wrapper
1 parent 6d6763e commit e892c02

File tree

7 files changed

+646
-83
lines changed

7 files changed

+646
-83
lines changed

crates/op-rbuilder/src/builders/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ impl<ExtraCtx: Debug + Default> OpPayloadBuilderCtx<ExtraCtx> {
326326
&self,
327327
info: &mut ExecutionInfo<E>,
328328
state: &mut State<DB>,
329-
mut best_txs: impl PayloadTxsBounds,
329+
best_txs: &mut impl PayloadTxsBounds,
330330
block_gas_limit: u64,
331331
block_da_limit: Option<u64>,
332332
) -> Result<Option<()>, PayloadBuilderError>
Lines changed: 164 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
1-
use std::{
2-
collections::{BTreeMap, VecDeque},
3-
sync::{
4-
atomic::{AtomicU64, Ordering},
5-
Arc,
6-
},
7-
};
8-
9-
use alloy_primitives::Address;
1+
use alloy_primitives::{Address, TxHash};
102
use reth_payload_util::PayloadTransactions;
113
use reth_transaction_pool::PoolTransaction;
4+
use std::collections::HashSet;
125
use tracing::debug;
136

147
use crate::tx::MaybeFlashblockFilter;
@@ -19,22 +12,36 @@ where
1912
I: PayloadTransactions<Transaction = T>,
2013
{
2114
inner: I,
22-
current_flashblock_number: Arc<AtomicU64>,
23-
early_transactions: BTreeMap<u64, VecDeque<T>>,
15+
current_flashblock_number: u64,
16+
// Transactions that were already commited to the state. Using them again would cause NonceTooLow
17+
// so we skip them
18+
commited_transactions: HashSet<TxHash>,
2419
}
2520

2621
impl<T, I> BestFlashblocksTxs<T, I>
2722
where
2823
T: PoolTransaction,
2924
I: PayloadTransactions<Transaction = T>,
3025
{
31-
pub fn new(inner: I, current_flashblock_number: Arc<AtomicU64>) -> Self {
26+
pub fn new(inner: I) -> Self {
3227
Self {
3328
inner,
34-
current_flashblock_number,
35-
early_transactions: Default::default(),
29+
current_flashblock_number: 0,
30+
commited_transactions: Default::default(),
3631
}
3732
}
33+
34+
/// Replaces current iterator with new one. We use it on new flashblock building, to refresh
35+
/// priority boundaries
36+
pub fn refresh_iterator(&mut self, inner: I, current_flashblock_number: u64) {
37+
self.inner = inner;
38+
self.current_flashblock_number = current_flashblock_number;
39+
}
40+
41+
/// Remove transaction from next iteration and it already in the state
42+
pub fn mark_commited(&mut self, txs: Vec<TxHash>) {
43+
self.commited_transactions.extend(txs);
44+
}
3845
}
3946

4047
impl<T, I> PayloadTransactions for BestFlashblocksTxs<T, I>
@@ -46,66 +53,30 @@ where
4653

4754
fn next(&mut self, ctx: ()) -> Option<Self::Transaction> {
4855
loop {
49-
let flashblock_number = self.current_flashblock_number.load(Ordering::Relaxed);
50-
51-
// Check for new transactions that can be executed with the higher flashblock number
52-
while let Some((&min_flashblock, _)) = self.early_transactions.first_key_value() {
53-
if min_flashblock > flashblock_number {
54-
break;
55-
}
56-
57-
if let Some(mut txs) = self.early_transactions.remove(&min_flashblock) {
58-
while let Some(tx) = txs.pop_front() {
59-
// Re-check max flashblock number just in case
60-
if let Some(max) = tx.flashblock_number_max() {
61-
if flashblock_number > max {
62-
debug!(
63-
target: "payload_builder",
64-
sender = ?tx.sender(),
65-
nonce = tx.nonce(),
66-
current_flashblock = flashblock_number,
67-
max_flashblock = max,
68-
"Bundle flashblock max exceeded"
69-
);
70-
self.mark_invalid(tx.sender(), tx.nonce());
71-
continue;
72-
}
73-
}
74-
75-
// The vecdeque isn't modified in place so we need to replace it
76-
if !txs.is_empty() {
77-
self.early_transactions.insert(min_flashblock, txs);
78-
}
79-
80-
return Some(tx);
81-
}
82-
}
83-
}
84-
8556
let tx = self.inner.next(ctx)?;
57+
// Skip transaction we already included
58+
if self.commited_transactions.contains(tx.hash()) {
59+
continue;
60+
}
8661

8762
let flashblock_number_min = tx.flashblock_number_min();
8863
let flashblock_number_max = tx.flashblock_number_max();
8964

9065
// Check min flashblock requirement
9166
if let Some(min) = flashblock_number_min {
92-
if flashblock_number < min {
93-
self.early_transactions
94-
.entry(min)
95-
.or_default()
96-
.push_back(tx);
67+
if self.current_flashblock_number < min {
9768
continue;
9869
}
9970
}
10071

10172
// Check max flashblock requirement
10273
if let Some(max) = flashblock_number_max {
103-
if flashblock_number > max {
74+
if self.current_flashblock_number > max {
10475
debug!(
10576
target: "payload_builder",
10677
sender = ?tx.sender(),
10778
nonce = tx.nonce(),
108-
current_flashblock = flashblock_number,
79+
current_flashblock = self.current_flashblock_number,
10980
max_flashblock = max,
11081
"Bundle flashblock max exceeded"
11182
);
@@ -118,15 +89,144 @@ where
11889
}
11990
}
12091

92+
/// Proxy to inner iterator
12193
fn mark_invalid(&mut self, sender: Address, nonce: u64) {
12294
self.inner.mark_invalid(sender, nonce);
95+
}
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use crate::{
101+
builders::flashblocks::best_txs::BestFlashblocksTxs,
102+
mock_tx::{MockFbTransaction, MockFbTransactionFactory},
103+
};
104+
use alloy_consensus::Transaction;
105+
use reth_payload_util::{BestPayloadTransactions, PayloadTransactions};
106+
use reth_transaction_pool::{pool::PendingPool, CoinbaseTipOrdering, PoolTransaction};
107+
use std::sync::Arc;
108+
109+
#[test]
110+
fn test_simple_case() {
111+
let mut pool = PendingPool::new(CoinbaseTipOrdering::<MockFbTransaction>::default());
112+
let mut f = MockFbTransactionFactory::default();
113+
114+
// Add 3 regular transaction
115+
let tx_1 = f.create_eip1559();
116+
let tx_2 = f.create_eip1559();
117+
let tx_3 = f.create_eip1559();
118+
pool.add_transaction(Arc::new(tx_1), 0);
119+
pool.add_transaction(Arc::new(tx_2), 0);
120+
pool.add_transaction(Arc::new(tx_3), 0);
121+
122+
// Create iterator
123+
let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best()));
124+
// ### First flashblock
125+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
126+
// Accept first tx
127+
let tx1 = iterator.next(()).unwrap();
128+
// Invalidate second tx
129+
let tx2 = iterator.next(()).unwrap();
130+
iterator.mark_invalid(tx2.sender(), tx2.nonce());
131+
// Accept third tx
132+
let tx3 = iterator.next(()).unwrap();
133+
// Check that it's empty
134+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
135+
// Mark transaction as commited
136+
iterator.mark_commited(vec![*tx1.hash(), *tx3.hash()]);
137+
138+
// ### Second flashblock
139+
// It should not return txs 1 and 3, but should return 2
140+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
141+
let tx2 = iterator.next(()).unwrap();
142+
// Check that it's empty
143+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
144+
// Mark transaction as commited
145+
iterator.mark_commited(vec![*tx2.hash()]);
146+
147+
// ### Third flashblock
148+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
149+
// Check that it's empty
150+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
151+
}
123152

124-
// Clear early_transactions from this sender with a greater nonce as
125-
// these transactions now will not execute because there would be a
126-
// nonce gap
127-
self.early_transactions.retain(|_, txs| {
128-
txs.retain(|tx| !(tx.sender() == sender && tx.nonce() > nonce));
129-
!txs.is_empty()
130-
});
153+
/// Test bundle cases
154+
/// We won't mark transactions as commited to test that boundaries are respected
155+
#[test]
156+
fn test_bundle_case() {
157+
let mut pool = PendingPool::new(CoinbaseTipOrdering::<MockFbTransaction>::default());
158+
let mut f = MockFbTransactionFactory::default();
159+
160+
// Add 4 fb transaction
161+
let tx_1 = f.create_legacy_fb(None, None);
162+
let tx_1_hash = *tx_1.hash();
163+
let tx_2 = f.create_legacy_fb(None, Some(1));
164+
let tx_2_hash = *tx_2.hash();
165+
let tx_3 = f.create_legacy_fb(Some(1), None);
166+
let tx_3_hash = *tx_3.hash();
167+
let tx_4 = f.create_legacy_fb(Some(2), Some(3));
168+
let tx_4_hash = *tx_4.hash();
169+
pool.add_transaction(Arc::new(tx_1), 0);
170+
pool.add_transaction(Arc::new(tx_2), 0);
171+
pool.add_transaction(Arc::new(tx_3), 0);
172+
pool.add_transaction(Arc::new(tx_4), 0);
173+
174+
// Create iterator
175+
let mut iterator = BestFlashblocksTxs::new(BestPayloadTransactions::new(pool.best()));
176+
// ### First flashblock
177+
// should contain txs 1 and 2
178+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 0);
179+
let tx1 = iterator.next(()).unwrap();
180+
assert_eq!(tx1.hash(), &tx_1_hash);
181+
let tx2 = iterator.next(()).unwrap();
182+
assert_eq!(tx2.hash(), &tx_2_hash);
183+
// Check that it's empty
184+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
185+
186+
// ### Second flashblock
187+
// should contain txs 1, 2, and 3
188+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 1);
189+
let tx1 = iterator.next(()).unwrap();
190+
assert_eq!(tx1.hash(), &tx_1_hash);
191+
let tx2 = iterator.next(()).unwrap();
192+
assert_eq!(tx2.hash(), &tx_2_hash);
193+
let tx3 = iterator.next(()).unwrap();
194+
assert_eq!(tx3.hash(), &tx_3_hash);
195+
// Check that it's empty
196+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
197+
198+
// ### Third flashblock
199+
// should contain txs 1, 3, and 4
200+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 2);
201+
let tx1 = iterator.next(()).unwrap();
202+
assert_eq!(tx1.hash(), &tx_1_hash);
203+
let tx3 = iterator.next(()).unwrap();
204+
assert_eq!(tx3.hash(), &tx_3_hash);
205+
let tx4 = iterator.next(()).unwrap();
206+
assert_eq!(tx4.hash(), &tx_4_hash);
207+
// Check that it's empty
208+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
209+
210+
// ### Forth flashblock
211+
// should contain txs 1, 3, and 4
212+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 3);
213+
let tx1 = iterator.next(()).unwrap();
214+
assert_eq!(tx1.hash(), &tx_1_hash);
215+
let tx3 = iterator.next(()).unwrap();
216+
assert_eq!(tx3.hash(), &tx_3_hash);
217+
let tx4 = iterator.next(()).unwrap();
218+
assert_eq!(tx4.hash(), &tx_4_hash);
219+
// Check that it's empty
220+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
221+
222+
// ### Fifth flashblock
223+
// should contain txs 1 and 3
224+
iterator.refresh_iterator(BestPayloadTransactions::new(pool.best()), 4);
225+
let tx1 = iterator.next(()).unwrap();
226+
assert_eq!(tx1.hash(), &tx_1_hash);
227+
let tx3 = iterator.next(()).unwrap();
228+
assert_eq!(tx3.hash(), &tx_3_hash);
229+
// Check that it's empty
230+
assert!(iterator.next(()).is_none(), "Iterator should be empty");
131231
}
132232
}

crates/op-rbuilder/src/builders/flashblocks/payload.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@ use rollup_boost::{
4343
use serde::{Deserialize, Serialize};
4444
use std::{
4545
ops::{Div, Rem},
46-
sync::{
47-
atomic::{AtomicU64, Ordering},
48-
Arc, OnceLock,
49-
},
46+
sync::{Arc, OnceLock},
5047
time::Instant,
5148
};
5249
use tokio::sync::{
@@ -65,15 +62,15 @@ struct ExtraExecutionInfo {
6562
#[derive(Debug, Default)]
6663
struct FlashblocksExtraCtx {
6764
/// Current flashblock index
68-
pub flashblock_index: Arc<AtomicU64>,
65+
pub flashblock_index: u64,
6966
/// Target flashblock count
7067
pub target_flashblock_count: u64,
7168
}
7269

7370
impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {
7471
/// Returns the current flashblock index
7572
pub fn flashblock_index(&self) -> u64 {
76-
self.extra_ctx.flashblock_index.load(Ordering::Relaxed)
73+
self.extra_ctx.flashblock_index
7774
}
7875

7976
/// Returns the target flashblock count
@@ -83,10 +80,8 @@ impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {
8380

8481
/// Increments the flashblock index
8582
pub fn increment_flashblock_index(&mut self) -> u64 {
86-
self.extra_ctx
87-
.flashblock_index
88-
.fetch_add(1, Ordering::Relaxed);
89-
self.flashblock_index()
83+
self.extra_ctx.flashblock_index += 1;
84+
self.extra_ctx.flashblock_index
9085
}
9186

9287
/// Sets the target flashblock count
@@ -261,7 +256,7 @@ where
261256
builder_signer: self.config.builder_signer,
262257
metrics: Default::default(),
263258
extra_ctx: FlashblocksExtraCtx {
264-
flashblock_index: Arc::new(AtomicU64::new(0)),
259+
flashblock_index: 0,
265260
target_flashblock_count: self.config.flashblocks_per_block(),
266261
},
267262
max_gas_per_txn: self.config.max_gas_per_txn,
@@ -378,6 +373,11 @@ where
378373
*da_limit = da_limit.saturating_sub(builder_tx_da_size);
379374
}
380375

376+
// Create best_transaction iterator
377+
let mut best_txs = BestFlashblocksTxs::new(BestPayloadTransactions::new(
378+
self.pool
379+
.best_transactions_with_attributes(ctx.best_transaction_attributes()),
380+
));
381381
// This channel coordinates flashblock building
382382
let (fb_cancel_token_rx, mut fb_cancel_token_tx) =
383383
mpsc::channel((self.config.flashblocks_per_block() + 1) as usize);
@@ -443,13 +443,13 @@ where
443443
}
444444

445445
let best_txs_start_time = Instant::now();
446-
let best_txs = BestFlashblocksTxs::new(
446+
best_txs.refresh_iterator(
447447
BestPayloadTransactions::new(
448448
self.pool.best_transactions_with_attributes(
449449
ctx.best_transaction_attributes(),
450450
),
451451
),
452-
ctx.extra_ctx.flashblock_index.clone(),
452+
ctx.flashblock_index(),
453453
);
454454
let transaction_pool_fetch_time = best_txs_start_time.elapsed();
455455
ctx.metrics
@@ -463,10 +463,19 @@ where
463463
ctx.execute_best_transactions(
464464
&mut info,
465465
&mut state,
466-
best_txs,
466+
&mut best_txs,
467467
total_gas_per_batch.min(ctx.block_gas_limit()),
468468
total_da_per_batch,
469469
)?;
470+
// Extract last transactions
471+
let new_transactions = info.executed_transactions
472+
[info.extra.last_flashblock_index..]
473+
.to_vec()
474+
.iter()
475+
.map(|tx| tx.tx_hash())
476+
.collect::<Vec<_>>();
477+
best_txs.mark_commited(new_transactions);
478+
470479
// We got block cancelled, we won't need anything from the block at this point
471480
// Caution: this assume that block cancel token only cancelled when new FCU is received
472481
if block_cancel.is_cancelled() {

0 commit comments

Comments
 (0)