Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions crates/optimism/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

use reth_primitives_traits::NodePrimitives;
use std::sync::Arc;

pub use payload::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, FlashBlockDecoder,
Expand Down Expand Up @@ -39,39 +40,37 @@ pub type PendingBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingFlashBlo
pub type FlashBlockCompleteSequenceRx =
tokio::sync::broadcast::Receiver<FlashBlockCompleteSequence>;

/// Receiver of received [`FlashBlock`]s from the (websocket) subscription.
///
/// [`FlashBlock`]: crate::FlashBlock
pub type FlashBlockRx = tokio::sync::broadcast::Receiver<Arc<FlashBlock>>;

/// Receiver that signals whether a [`FlashBlock`] is currently being built.
pub type InProgressFlashBlockRx = tokio::sync::watch::Receiver<Option<FlashBlockBuildInfo>>;

/// Container for all flashblocks-related listeners.
///
/// Groups together the three receivers that provide flashblock-related updates.
/// Groups together the channels for flashblock-related updates.
#[derive(Debug)]
pub struct FlashblocksListeners<N: NodePrimitives> {
/// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s.
/// Receiver of the most recent executed [`PendingFlashBlock`] built out of [`FlashBlock`]s.
pub pending_block_rx: PendingBlockRx<N>,
/// Receiver of the sequences of [`FlashBlock`]s built.
pub flashblock_rx: FlashBlockCompleteSequenceRx,
/// Subscription channel of the complete sequences of [`FlashBlock`]s built.
pub flashblocks_sequence: tokio::sync::broadcast::Sender<FlashBlockCompleteSequence>,
/// Receiver that signals whether a [`FlashBlock`] is currently being built.
pub in_progress_rx: InProgressFlashBlockRx,
/// Subscription channel for received flashblocks from the (websocket) connection.
pub received_flashblocks: tokio::sync::broadcast::Sender<Arc<FlashBlock>>,
}

impl<N: NodePrimitives> FlashblocksListeners<N> {
/// Creates a new [`FlashblocksListeners`] with the given receivers.
/// Creates a new [`FlashblocksListeners`] with the given channels.
pub const fn new(
pending_block_rx: PendingBlockRx<N>,
flashblock_rx: FlashBlockCompleteSequenceRx,
flashblocks_sequence: tokio::sync::broadcast::Sender<FlashBlockCompleteSequence>,
in_progress_rx: InProgressFlashBlockRx,
received_flashblocks: tokio::sync::broadcast::Sender<Arc<FlashBlock>>,
) -> Self {
Self { pending_block_rx, flashblock_rx, in_progress_rx }
}
}

impl<N: NodePrimitives> Clone for FlashblocksListeners<N> {
fn clone(&self) -> Self {
Self {
pending_block_rx: self.pending_block_rx.clone(),
flashblock_rx: self.flashblock_rx.resubscribe(),
in_progress_rx: self.in_progress_rx.clone(),
}
Self { pending_block_rx, flashblocks_sequence, in_progress_rx, received_flashblocks }
}
}
12 changes: 11 additions & 1 deletion crates/optimism/flashblocks/src/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ where
Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None }
}

/// Returns the sender half of the [`FlashBlockCompleteSequence`] channel.
pub const fn block_sequence_broadcaster(
&self,
) -> &broadcast::Sender<FlashBlockCompleteSequence> {
&self.block_broadcaster
}

/// Gets a subscriber to the flashblock sequences produced.
pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
self.block_broadcaster.subscribe()
Expand Down Expand Up @@ -160,7 +167,10 @@ where
}

/// A complete sequence of flashblocks, often corresponding to a full block.
/// Ensure invariants of a complete flashblocks sequence.
///
/// Ensures invariants of a complete flashblocks sequence.
/// If this entire sequence of flashblocks was executed on top of latest block, this also includes
/// the computed state root.
#[derive(Debug, Clone)]
pub struct FlashBlockCompleteSequence {
inner: Vec<FlashBlock>,
Expand Down
53 changes: 40 additions & 13 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
sequence::FlashBlockPendingSequence,
worker::{BuildArgs, FlashBlockBuilder},
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx,
PendingFlashBlock,
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx,
InProgressFlashBlockRx, PendingFlashBlock,
};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
Expand All @@ -19,6 +19,7 @@ use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
use reth_tasks::TaskExecutor;
use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Instant,
};
Expand All @@ -42,6 +43,8 @@ pub struct FlashBlockService<
rx: S,
current: Option<PendingFlashBlock<N>>,
blocks: FlashBlockPendingSequence<N::SignedTx>,
/// Broadcast channel to forward received flashblocks from the subscription.
received_flashblocks_tx: tokio::sync::broadcast::Sender<Arc<FlashBlock>>,
rebuild: bool,
builder: FlashBlockBuilder<EvmConfig, Provider>,
canon_receiver: CanonStateNotifications<N>,
Expand All @@ -60,17 +63,6 @@ pub struct FlashBlockService<
compute_state_root: bool,
}

/// Information for a flashblock currently built
#[derive(Debug, Clone, Copy)]
pub struct FlashBlockBuildInfo {
/// Parent block hash
pub parent_hash: B256,
/// Flashblock index within the current block's sequence
pub index: u64,
/// Block number of the flashblock being built.
pub block_number: u64,
}

impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
Expand All @@ -92,10 +84,12 @@ where
/// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
let (in_progress_tx, _) = watch::channel(None);
let (received_flashblocks_tx, _) = tokio::sync::broadcast::channel(128);
Self {
rx,
current: None,
blocks: FlashBlockPendingSequence::new(),
received_flashblocks_tx,
canon_receiver: provider.subscribe_to_canonical_state(),
builder: FlashBlockBuilder::new(evm_config, provider),
rebuild: false,
Expand All @@ -114,6 +108,20 @@ where
self
}

/// Returns the sender half to the received flashblocks.
pub const fn flashblocks_broadcaster(
&self,
) -> &tokio::sync::broadcast::Sender<Arc<FlashBlock>> {
&self.received_flashblocks_tx
}

/// Returns the sender half to the flashblock sequence.
pub const fn block_sequence_broadcaster(
&self,
) -> &tokio::sync::broadcast::Sender<FlashBlockCompleteSequence> {
self.blocks.block_sequence_broadcaster()
}

/// Returns a subscriber to the flashblock sequence.
pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
self.blocks.subscribe_block_sequence()
Expand All @@ -137,6 +145,13 @@ where
warn!("Flashblock service has stopped");
}

/// Notifies all subscribers about the received flashblock
fn notify_received_flashblock(&self, flashblock: &FlashBlock) {
if self.received_flashblocks_tx.receiver_count() > 0 {
let _ = self.received_flashblocks_tx.send(Arc::new(flashblock.clone()));
}
}

/// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier.
///
/// Returns `None` if the flashblock have no `base` or the base is not a child block of latest.
Expand Down Expand Up @@ -284,6 +299,7 @@ where
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
match result {
Ok(flashblock) => {
this.notify_received_flashblock(&flashblock);
if flashblock.index == 0 {
this.metrics.last_flashblock_length.record(this.blocks.count() as f64);
}
Expand Down Expand Up @@ -344,6 +360,17 @@ where
}
}

/// Information for a flashblock currently built
#[derive(Debug, Clone, Copy)]
pub struct FlashBlockBuildInfo {
/// Parent block hash
pub parent_hash: B256,
/// Flashblock index within the current block's sequence
pub index: u64,
/// Block number of the flashblock being built.
pub block_number: u64,
}

type BuildJob<N> =
(Instant, oneshot::Receiver<eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>>>);

Expand Down
25 changes: 18 additions & 7 deletions crates/optimism/rpc/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
use reth_optimism_flashblocks::{
ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService,
FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockRx,
FlashBlockService, FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
};
use reth_rpc::eth::core::EthApiInner;
use reth_rpc_eth_api::{
Expand Down Expand Up @@ -110,9 +110,14 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
self.inner.flashblocks.as_ref().map(|f| f.pending_block_rx.clone())
}

/// Returns a flashblock receiver, if any, by resubscribing to it.
pub fn flashblock_rx(&self) -> Option<FlashBlockCompleteSequenceRx> {
self.inner.flashblocks.as_ref().map(|f| f.flashblock_rx.resubscribe())
/// Returns a new subscription to received flashblocks.
pub fn subscribe_received_flashblocks(&self) -> Option<FlashBlockRx> {
self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe())
}

/// Returns a new subscription to flashblock sequences.
pub fn subscribe_flashblock_sequence(&self) -> Option<FlashBlockCompleteSequenceRx> {
self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe())
}

/// Returns information about the flashblock currently being built, if any.
Expand Down Expand Up @@ -501,12 +506,18 @@ where
ctx.components.task_executor().clone(),
);

let flashblock_rx = service.subscribe_block_sequence();
let flashblocks_sequence = service.block_sequence_broadcaster().clone();
let received_flashblocks = service.flashblocks_broadcaster().clone();
let in_progress_rx = service.subscribe_in_progress();

ctx.components.task_executor().spawn(Box::pin(service.run(tx)));

Some(FlashblocksListeners::new(pending_rx, flashblock_rx, in_progress_rx))
Some(FlashblocksListeners::new(
pending_rx,
flashblocks_sequence,
in_progress_rx,
received_flashblocks,
))
} else {
None
};
Expand Down
Loading