Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions crates/optimism/flashblocks/src/app.rs

This file was deleted.

8 changes: 6 additions & 2 deletions crates/optimism/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! A downstream integration of Flashblocks.

pub use app::{launch_wss_flashblocks_service, FlashBlockRx};
pub use payload::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, Metadata,
};
use reth_rpc_eth_types::PendingBlock;
pub use service::FlashBlockService;
pub use ws::{WsConnect, WsFlashBlockStream};

mod app;
mod payload;
mod service;
mod ws;

/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s.
///
/// [`FlashBlock`]: crate::FlashBlock
pub type FlashBlockRx<N> = tokio::sync::watch::Receiver<Option<PendingBlock<N>>>;
14 changes: 13 additions & 1 deletion crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct FlashBlockService<
impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
Expand All @@ -58,7 +59,7 @@ where
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
>,
> + Unpin,
{
/// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self {
Expand All @@ -73,6 +74,17 @@ where
}
}

/// Drives the services and sends new blocks to the receiver
///
/// Note: this should be spawned
pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingBlock<N>>>) {
while let Some(block) = self.next().await {
if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}"));
}
}
}

/// Returns the cached reads at the given head hash.
///
/// Returns a new cache instance if this is new `head` hash.
Expand Down
21 changes: 15 additions & 6 deletions crates/optimism/rpc/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy};
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
use reth_optimism_flashblocks::{
launch_wss_flashblocks_service, ExecutionPayloadBaseV1, FlashBlockRx,
ExecutionPayloadBaseV1, FlashBlockRx, FlashBlockService, WsFlashBlockStream,
};
use reth_rpc::eth::{core::EthApiInner, DevSigner};
use reth_rpc_eth_api::{
Expand All @@ -43,6 +43,8 @@ use reth_tasks::{
TaskSpawner,
};
use std::{fmt, fmt::Formatter, marker::PhantomData, sync::Arc, time::Instant};
use tokio::sync::watch;
use tracing::info;

/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
Expand Down Expand Up @@ -457,13 +459,20 @@ where
None
};

let flashblocks_rx = flashblocks_url.map(|ws_url| {
launch_wss_flashblocks_service(
ws_url,
let flashblocks_rx = if let Some(ws_url) = flashblocks_url {
info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
let (tx, rx) = watch::channel(None);
let stream = WsFlashBlockStream::new(ws_url);
let service = FlashBlockService::new(
stream,
ctx.components.evm_config().clone(),
ctx.components.provider().clone(),
)
});
);
ctx.components.task_executor().spawn_blocking(Box::pin(service.run(tx)));
Some(rx)
} else {
None
};

let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();

Expand Down
Loading