Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crates/optimism/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ mod ws;
/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s.
///
/// [`FlashBlock`]: crate::FlashBlock
pub type FlashBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingBlock<N>>>;
pub type PendingBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingBlock<N>>>;

/// Receiver of the sequences of [`FlashBlock`]s built.
///
/// [`FlashBlock`]: crate::FlashBlock
pub type FlashBlockCompleteSequenceRx =
tokio::sync::broadcast::Receiver<FlashBlockCompleteSequence>;
108 changes: 98 additions & 10 deletions crates/optimism/flashblocks/src/sequence.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use crate::{ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::eip2718::WithEncoded;
use core::mem;
use eyre::{bail, OptionExt};
use reth_primitives_traits::{Recovered, SignedTransaction};
use std::collections::BTreeMap;
use tracing::trace;
use tokio::sync::broadcast;
use tracing::{debug, trace, warn};

/// The size of the broadcast channel for completed flashblock sequences.
const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;

/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices.
#[derive(Debug)]
Expand All @@ -13,14 +18,51 @@ pub(crate) struct FlashBlockPendingSequence<T> {
/// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new
/// pending block, we expect 11 flashblocks per slot.
inner: BTreeMap<u64, PreparedFlashBlock<T>>,
/// Broadcasts flashblocks to subscribers.
block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
}

impl<T> FlashBlockPendingSequence<T>
where
T: SignedTransaction,
{
pub(crate) const fn new() -> Self {
Self { inner: BTreeMap::new() }
pub(crate) fn new() -> Self {
// Note: if the channel is full, send will not block but rather overwrite the oldest
// messages. Order is preserved.
let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
Self { inner: BTreeMap::new(), block_broadcaster: tx }
}

/// Gets a subscriber to the flashblock sequences produced.
pub(crate) fn subscribe_block_sequence(
&self,
) -> broadcast::Receiver<FlashBlockCompleteSequence> {
self.block_broadcaster.subscribe()
}

// Clears the state and broadcasts the blocks produced to subscribers.
fn clear_and_broadcast_blocks(&mut self) {
let flashblocks = mem::take(&mut self.inner);

// If there are any subscribers, send the flashblocks to them.
if self.block_broadcaster.receiver_count() > 0 {
let flashblocks = match FlashBlockCompleteSequence::new(
flashblocks.into_iter().map(|block| block.1.into()).collect(),
) {
Ok(flashblocks) => flashblocks,
Err(err) => {
debug!(target: "flashblocks", error = ?err, "Failed to create full flashblock complete sequence");
return;
}
};

// Note: this should only ever fail if there are no receivers. This can happen if
// there is a race condition between the clause right above and this
// one. We can simply warn the user and continue.
if let Err(err) = self.block_broadcaster.send(flashblocks) {
warn!(target: "flashblocks", error = ?err, "Failed to send flashblocks to subscribers");
}
}
}

/// Inserts a new block into the sequence.
Expand All @@ -29,8 +71,10 @@ where
pub(crate) fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
if flashblock.index == 0 {
trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
// Flash block at index zero resets the whole state
self.clear();
Comment thread
theochap marked this conversation as resolved.

// Flash block at index zero resets the whole state.
self.clear_and_broadcast_blocks();

self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?);
return Ok(())
}
Expand Down Expand Up @@ -65,10 +109,6 @@ where
.flat_map(|(_, block)| block.txs.clone())
}

fn clear(&mut self) {
self.inner.clear();
}

/// Returns the first block number
pub(crate) fn block_number(&self) -> Option<u64> {
Some(self.inner.values().next()?.block().metadata.block_number)
Expand All @@ -87,7 +127,7 @@ where

/// A complete sequence of flashblocks, often corresponding to a full block.
/// Ensure invariants of a complete flashblocks sequence.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FlashBlockCompleteSequence(Vec<FlashBlock>);

impl FlashBlockCompleteSequence {
Expand Down Expand Up @@ -153,6 +193,12 @@ impl<T> PreparedFlashBlock<T> {
}
}

impl<T> From<PreparedFlashBlock<T>> for FlashBlock {
fn from(val: PreparedFlashBlock<T>) -> Self {
val.block
}
}

impl<T> PreparedFlashBlock<T>
where
T: SignedTransaction,
Expand Down Expand Up @@ -238,4 +284,46 @@ mod tests {

assert_eq!(actual_txs, expected_txs);
}

#[test]
fn test_sequence_sends_flashblocks_to_subscribers() {
let mut sequence = FlashBlockPendingSequence::<EthereumTxEnvelope<TxEip1559>>::new();
let mut subscriber = sequence.subscribe_block_sequence();

for idx in 0..10 {
sequence
.insert(FlashBlock {
payload_id: Default::default(),
index: idx,
base: Some(ExecutionPayloadBaseV1::default()),
diff: Default::default(),
metadata: Default::default(),
})
.unwrap();
}

assert_eq!(sequence.count(), 10);

// Then we don't receive anything until we insert a new flashblock
let no_flashblock = subscriber.try_recv();
assert!(no_flashblock.is_err());

// Let's insert a new flashblock with index 0
sequence
.insert(FlashBlock {
payload_id: Default::default(),
index: 0,
base: Some(ExecutionPayloadBaseV1::default()),
diff: Default::default(),
metadata: Default::default(),
})
.unwrap();

let flashblocks = subscriber.try_recv().unwrap();
assert_eq!(flashblocks.count(), 10);

for (idx, block) in flashblocks.0.iter().enumerate() {
assert_eq!(block.index, idx as u64);
}
}
}
12 changes: 10 additions & 2 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
sequence::FlashBlockPendingSequence,
worker::{BuildArgs, FlashBlockBuilder},
ExecutionPayloadBaseV1, FlashBlock,
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequence,
};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
Expand All @@ -20,7 +20,10 @@ use std::{
task::{ready, Context, Poll},
time::Instant,
};
use tokio::{pin, sync::oneshot};
use tokio::{
pin,
sync::{broadcast, oneshot},
};
use tracing::{debug, trace, warn};

/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
Expand Down Expand Up @@ -80,6 +83,11 @@ where
}
}

/// Returns a subscriber to the flashblock sequence.
pub fn subscribe_block_sequence(&self) -> broadcast::Receiver<FlashBlockCompleteSequence> {
self.blocks.subscribe_block_sequence()
}

/// Drives the services and sends new blocks to the receiver
///
/// Note: this should be spawned
Expand Down
42 changes: 29 additions & 13 deletions crates/optimism/rpc/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy};
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
use reth_optimism_flashblocks::{
ExecutionPayloadBaseV1, FlashBlockRx, FlashBlockService, WsFlashBlockStream,
ExecutionPayloadBaseV1, FlashBlockCompleteSequenceRx, FlashBlockService, PendingBlockRx,
WsFlashBlockStream,
};
use reth_rpc::eth::{core::EthApiInner, DevSigner};
use reth_rpc_eth_api::{
Expand Down Expand Up @@ -76,13 +77,15 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
eth_api: EthApiNodeBackend<N, Rpc>,
sequencer_client: Option<SequencerClient>,
min_suggested_priority_fee: U256,
flashblocks_rx: Option<FlashBlockRx<N::Primitives>>,
pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
) -> Self {
let inner = Arc::new(OpEthApiInner {
eth_api,
sequencer_client,
min_suggested_priority_fee,
flashblocks_rx,
pending_block_rx,
flashblock_rx,
});
Self { inner }
}
Expand All @@ -96,9 +99,14 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
self.inner.sequencer_client()
}

/// Returns a cloned Flashblocks receiver, if any.
pub fn flashblocks_rx(&self) -> Option<FlashBlockRx<N::Primitives>> {
self.inner.flashblocks_rx.clone()
/// Returns a cloned pending block receiver, if any.
pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
self.inner.pending_block_rx.clone()
}

/// Returns a flashblock receiver, if any, by resubscribing to it.
pub fn flashblock_rx(&self) -> Option<FlashBlockCompleteSequenceRx> {
self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
}

/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
Expand All @@ -119,7 +127,7 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
};

let Some(rx) = self.inner.flashblocks_rx.as_ref() else { return Ok(None) };
let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
let pending_block = rx.borrow();
let Some(pending_block) = pending_block.as_ref() else { return Ok(None) };

Expand Down Expand Up @@ -321,10 +329,14 @@ pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
///
/// See also <https://github.com/ethereum-optimism/op-geth/blob/d4e0fe9bb0c2075a9bff269fb975464dd8498f75/eth/gasprice/optimism-gasprice.go#L38-L38>
min_suggested_priority_fee: U256,
/// Flashblocks receiver.
/// Pending block receiver.
///
/// If set, then it provides current pending block based on received Flashblocks.
flashblocks_rx: Option<FlashBlockRx<N::Primitives>>,
pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
/// Flashblocks receiver.
///
/// If set, then it provides sequences of flashblock built.
flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
}

impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
Expand Down Expand Up @@ -459,29 +471,33 @@ where
None
};

let flashblocks_rx = if let Some(ws_url) = flashblocks_url {
let rxs = if let Some(ws_url) = flashblocks_url {
info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
let (tx, rx) = watch::channel(None);
let (tx, pending_block_rx) = watch::channel(None);
let stream = WsFlashBlockStream::new(ws_url);
let service = FlashBlockService::new(
stream,
ctx.components.evm_config().clone(),
ctx.components.provider().clone(),
ctx.components.task_executor().clone(),
);
let flashblock_rx = service.subscribe_block_sequence();
ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
Some(rx)
Some((pending_block_rx, flashblock_rx))
} else {
None
};

let (pending_block_rx, flashblock_rx) = rxs.unzip();

let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();

Ok(OpEthApi::new(
eth_api,
sequencer_client,
U256::from(min_suggested_priority_fee),
flashblocks_rx,
pending_block_rx,
flashblock_rx,
))
}
}
4 changes: 2 additions & 2 deletions crates/optimism/rpc/src/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,12 @@ mod tests {
#[test]
fn parses_transaction_hash_from_params() {
let hash = "0xdbdfa0f88b2cf815fdc1621bd20c2bd2b0eed4f0c56c9be2602957b5a60ec702";
let params_str = format!(r#"["{}"]"#, hash);
let params_str = format!(r#"["{hash}"]"#);
let params = Params::new(Some(&params_str));
let result = parse_transaction_hash_from_params(&params);
assert!(result.is_ok());
let parsed_hash = result.unwrap();
assert_eq!(format!("{:?}", parsed_hash), hash);
assert_eq!(format!("{parsed_hash:?}"), hash);
}

/// Tests that invalid transaction hash returns error.
Expand Down