Skip to content

Commit c5c46f3

Browse files
authored
Add flashblock number filters to eth_sendBundle (#213)
* add min and max flashblock number fields to bundle * add min and max flashblock number to FBPooledTransaction * filter based on flashblock number * wrap best txs iterator to consider flashblock number * fix map structure and wrap better * integration tests
1 parent ef159dd commit c5c46f3

File tree

9 files changed

+443
-82
lines changed

9 files changed

+443
-82
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
use std::{
2+
collections::{BTreeMap, VecDeque},
3+
sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
},
7+
};
8+
9+
use alloy_primitives::Address;
10+
use reth_payload_util::PayloadTransactions;
11+
use reth_transaction_pool::PoolTransaction;
12+
use tracing::debug;
13+
14+
use crate::tx::MaybeFlashblockFilter;
15+
16+
pub struct BestFlashblocksTxs<T, I>
17+
where
18+
T: PoolTransaction,
19+
I: PayloadTransactions<Transaction = T>,
20+
{
21+
inner: I,
22+
current_flashblock_number: Arc<AtomicU64>,
23+
early_transactions: BTreeMap<u64, VecDeque<T>>,
24+
}
25+
26+
impl<T, I> BestFlashblocksTxs<T, I>
27+
where
28+
T: PoolTransaction,
29+
I: PayloadTransactions<Transaction = T>,
30+
{
31+
pub fn new(inner: I, current_flashblock_number: Arc<AtomicU64>) -> Self {
32+
Self {
33+
inner,
34+
current_flashblock_number,
35+
early_transactions: Default::default(),
36+
}
37+
}
38+
}
39+
40+
impl<T, I> PayloadTransactions for BestFlashblocksTxs<T, I>
41+
where
42+
T: PoolTransaction + MaybeFlashblockFilter,
43+
I: PayloadTransactions<Transaction = T>,
44+
{
45+
type Transaction = T;
46+
47+
fn next(&mut self, ctx: ()) -> Option<Self::Transaction> {
48+
loop {
49+
let flashblock_number = self.current_flashblock_number.load(Ordering::Relaxed);
50+
51+
// Check for new transactions that can be executed with the higher flashblock number
52+
while let Some((&min_flashblock, _)) = self.early_transactions.first_key_value() {
53+
if min_flashblock > flashblock_number {
54+
break;
55+
}
56+
57+
if let Some(mut txs) = self.early_transactions.remove(&min_flashblock) {
58+
while let Some(tx) = txs.pop_front() {
59+
// Re-check max flashblock number just in case
60+
if let Some(max) = tx.flashblock_number_max() {
61+
if flashblock_number > max {
62+
debug!(
63+
target: "payload_builder",
64+
sender = ?tx.sender(),
65+
nonce = tx.nonce(),
66+
current_flashblock = flashblock_number,
67+
max_flashblock = max,
68+
"Bundle flashblock max exceeded"
69+
);
70+
self.mark_invalid(tx.sender(), tx.nonce());
71+
continue;
72+
}
73+
}
74+
75+
// The vecdeque isn't modified in place so we need to replace it
76+
if !txs.is_empty() {
77+
self.early_transactions.insert(min_flashblock, txs);
78+
}
79+
80+
return Some(tx);
81+
}
82+
}
83+
}
84+
85+
let tx = self.inner.next(ctx)?;
86+
87+
let flashblock_number_min = tx.flashblock_number_min();
88+
let flashblock_number_max = tx.flashblock_number_max();
89+
90+
// Check min flashblock requirement
91+
if let Some(min) = flashblock_number_min {
92+
if flashblock_number < min {
93+
self.early_transactions
94+
.entry(min)
95+
.or_default()
96+
.push_back(tx);
97+
continue;
98+
}
99+
}
100+
101+
// Check max flashblock requirement
102+
if let Some(max) = flashblock_number_max {
103+
if flashblock_number > max {
104+
debug!(
105+
target: "payload_builder",
106+
sender = ?tx.sender(),
107+
nonce = tx.nonce(),
108+
current_flashblock = flashblock_number,
109+
max_flashblock = max,
110+
"Bundle flashblock max exceeded"
111+
);
112+
self.inner.mark_invalid(tx.sender(), tx.nonce());
113+
continue;
114+
}
115+
}
116+
117+
return Some(tx);
118+
}
119+
}
120+
121+
fn mark_invalid(&mut self, sender: Address, nonce: u64) {
122+
self.inner.mark_invalid(sender, nonce);
123+
124+
// Clear early_transactions from this sender with a greater nonce as
125+
// these transactions now will not execute because there would be a
126+
// nonce gap
127+
self.early_transactions.retain(|_, txs| {
128+
txs.retain(|tx| !(tx.sender() == sender && tx.nonce() > nonce));
129+
!txs.is_empty()
130+
});
131+
}
132+
}

crates/op-rbuilder/src/builders/flashblocks/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use service::FlashblocksServiceBuilder;
55

66
mod config;
77
//mod context;
8+
mod best_txs;
89
mod payload;
910
mod service;
1011
mod wspub;

crates/op-rbuilder/src/builders/flashblocks/payload.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::{config::FlashblocksConfig, wspub::WebSocketPublisher};
22
use crate::{
33
builders::{
44
context::{estimate_gas_for_builder_tx, OpPayloadBuilderCtx},
5-
flashblocks::config::FlashBlocksConfigExt,
5+
flashblocks::{best_txs::BestFlashblocksTxs, config::FlashBlocksConfigExt},
66
generator::{BlockCell, BuildArguments},
77
BuilderConfig, BuilderTx,
88
},
@@ -42,7 +42,10 @@ use rollup_boost::{
4242
use serde::{Deserialize, Serialize};
4343
use std::{
4444
ops::{Div, Rem},
45-
sync::Arc,
45+
sync::{
46+
atomic::{AtomicU64, Ordering},
47+
Arc,
48+
},
4649
time::Instant,
4750
};
4851
use tokio::sync::{
@@ -61,15 +64,15 @@ struct ExtraExecutionInfo {
6164
#[derive(Debug, Default)]
6265
struct FlashblocksExtraCtx {
6366
/// Current flashblock index
64-
pub flashblock_index: u64,
67+
pub flashblock_index: Arc<AtomicU64>,
6568
/// Target flashblock count
6669
pub target_flashblock_count: u64,
6770
}
6871

6972
impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {
7073
/// Returns the current flashblock index
7174
pub fn flashblock_index(&self) -> u64 {
72-
self.extra_ctx.flashblock_index
75+
self.extra_ctx.flashblock_index.load(Ordering::Relaxed)
7376
}
7477

7578
/// Returns the target flashblock count
@@ -79,8 +82,10 @@ impl OpPayloadBuilderCtx<FlashblocksExtraCtx> {
7982

8083
/// Increments the flashblock index
8184
pub fn increment_flashblock_index(&mut self) -> u64 {
82-
self.extra_ctx.flashblock_index += 1;
83-
self.extra_ctx.flashblock_index
85+
self.extra_ctx
86+
.flashblock_index
87+
.fetch_add(1, Ordering::Relaxed);
88+
self.flashblock_index()
8489
}
8590

8691
/// Sets the target flashblock count
@@ -248,7 +253,7 @@ where
248253
builder_signer: self.config.builder_signer,
249254
metrics: Default::default(),
250255
extra_ctx: FlashblocksExtraCtx {
251-
flashblock_index: 0,
256+
flashblock_index: Arc::new(AtomicU64::new(0)),
252257
target_flashblock_count: self.config.flashblocks_per_block(),
253258
},
254259
};
@@ -430,10 +435,13 @@ where
430435
.build();
431436

432437
let best_txs_start_time = Instant::now();
433-
let best_txs = BestPayloadTransactions::new(
434-
// We are not using without_updates in here, so arriving transaction could target the current block
435-
self.pool
436-
.best_transactions_with_attributes(ctx.best_transaction_attributes()),
438+
let best_txs = BestFlashblocksTxs::new(
439+
BestPayloadTransactions::new(
440+
self.pool.best_transactions_with_attributes(
441+
ctx.best_transaction_attributes(),
442+
),
443+
),
444+
ctx.extra_ctx.flashblock_index.clone(),
437445
);
438446
let transaction_pool_fetch_time = best_txs_start_time.elapsed();
439447
ctx.metrics

0 commit comments

Comments
 (0)