Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
utils::get_single_header,
};
use clap::Parser;
use futures::{stream::select as stream_select, StreamExt};
use futures::stream::select as stream_select;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_cli_runner::CliContext;
use reth_config::{config::EtlConfig, Config};
Expand Down Expand Up @@ -259,8 +259,8 @@ impl Command {

let pipeline_events = pipeline.events();
let events = stream_select(
network.event_listener().map(Into::into),
pipeline_events.map(Into::into),
reth_node_events::node::handle_broadcast_stream(network.event_listener()),
reth_node_events::node::handle_broadcast_stream(pipeline_events),
);
ctx.task_executor.spawn_critical(
"events task",
Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use futures::Stream;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_config::{config::EtlConfig, Config};
use reth_consensus::Consensus;
Expand Down Expand Up @@ -257,7 +257,7 @@ where

let max_block = file_client.max_block().unwrap_or(0);

let mut pipeline = Pipeline::builder()
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block)
Expand All @@ -277,7 +277,7 @@ where
)
.build(provider_factory, static_file_producer);

let events = pipeline.events().map(Into::into);
let events = reth_node_events::node::handle_broadcast_stream(pipeline.events());

Ok((pipeline, events))
}
Expand Down
6 changes: 3 additions & 3 deletions crates/consensus/auto-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::{mpsc::Sender, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::trace;

mod client;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct AutoSealBuilder<Client, Pool, Engine: EngineTypes, EvmConfig> {
pool: Pool,
mode: MiningMode,
storage: Storage,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
evm_config: EvmConfig,
}
Expand All @@ -115,7 +115,7 @@ where
chain_spec: Arc<ChainSpec>,
client: Client,
pool: Pool,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
mode: MiningMode,
evm_config: EvmConfig,
Expand Down
29 changes: 17 additions & 12 deletions crates/consensus/auto-seal/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::sync::{mpsc::Sender, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use tracing::{debug, error, warn};

/// A Future that listens for new ready transactions and puts new blocks into storage
Expand All @@ -30,19 +30,19 @@ pub struct MiningTask<Client, Pool: TransactionPool, Executor, Engine: EngineTyp
/// The active miner
miner: MiningMode,
/// Single active future that inserts a new block into `storage`
insert_task: Option<BoxFuture<'static, Option<UnboundedReceiverStream<PipelineEvent>>>>,
insert_task: Option<BoxFuture<'static, Option<BroadcastStream<PipelineEvent>>>>,
/// Shared storage to insert new blocks
storage: Storage,
/// Pool where transactions are stored
pool: Pool,
/// backlog of sets of transactions ready to be mined
queued: VecDeque<Vec<Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>>,
// TODO: ideally this would just be a sender of hashes
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
/// Used to notify consumers of new blocks
canon_state_notification: CanonStateNotificationSender,
/// The pipeline events to listen on
pipe_line_events: Option<UnboundedReceiverStream<PipelineEvent>>,
pipe_line_events: Option<BroadcastStream<PipelineEvent>>,
/// The type used for block execution
block_executor: Executor,
}
Expand All @@ -57,7 +57,7 @@ impl<Executor, Client, Pool: TransactionPool, Engine: EngineTypes>
pub(crate) fn new(
chain_spec: Arc<ChainSpec>,
miner: MiningMode,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
to_engine: Sender<BeaconEngineMessage<Engine>>,
canon_state_notification: CanonStateNotificationSender,
storage: Storage,
client: Client,
Expand All @@ -80,7 +80,7 @@ impl<Executor, Client, Pool: TransactionPool, Engine: EngineTypes>
}

/// Sets the pipeline events to listen on.
pub fn set_pipeline_events(&mut self, events: UnboundedReceiverStream<PipelineEvent>) {
pub fn set_pipeline_events(&mut self, events: BroadcastStream<PipelineEvent>) {
self.pipe_line_events = Some(events);
}
}
Expand Down Expand Up @@ -166,11 +166,16 @@ where
// send the new update to the engine, this will trigger the engine
// to download and execute the block we just inserted
let (tx, rx) = oneshot::channel();
let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
});
if let Err(err) = to_engine
.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx,
})
.await
{
warn!("sending to consensus bounded channel: {err}");
}
debug!(target: "consensus::auto", ?state, "Sent fork choice update");

match rx.await.unwrap() {
Expand Down
53 changes: 35 additions & 18 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use reth_interfaces::RethResult;
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use reth_tokio_util::EventListeners;
use tokio::sync::{mpsc::Sender, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use tracing::warn;

/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
/// engine task.
Expand All @@ -22,15 +24,16 @@ pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
pub(crate) to_engine: Sender<BeaconEngineMessage<Engine>>,
event_listeners: EventListeners<BeaconConsensusEngineEvent>,
}

impl<Engine> Clone for BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
fn clone(&self) -> Self {
Self { to_engine: self.to_engine.clone() }
Self { to_engine: self.to_engine.clone(), event_listeners: self.event_listeners.clone() }
}
}

Expand All @@ -41,8 +44,11 @@ where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
Self { to_engine }
pub fn new(
to_engine: Sender<BeaconEngineMessage<Engine>>,
event_listeners: EventListeners<BeaconConsensusEngineEvent>,
) -> Self {
Self { to_engine, event_listeners }
}

/// Sends a new payload message to the beacon consensus engine and waits for a response.
Expand All @@ -54,7 +60,13 @@ where
cancun_fields: Option<CancunPayloadFields>,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx });
if let Err(err) = self
.to_engine
.send(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
.await
{
warn!("sending to consensus bounded channel: {err}");
}
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}

Expand All @@ -68,38 +80,43 @@ where
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.await
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}

/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
async fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
});
if let Err(err) = self
.to_engine
.send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
.await
{
warn!("sending to consensus bounded channel: {err}");
}
rx
}

/// Sends a transition configuration exchange message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
pub async fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
if let Err(err) =
self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged).await
{
warn!("sending to consensus bounded channel: {err}");
}
}

/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> UnboundedReceiverStream<BeaconConsensusEngineEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
pub fn event_listener(&self) -> BroadcastStream<BeaconConsensusEngineEvent> {
self.event_listeners.new_listener()
}
}
3 changes: 1 addition & 2 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
return Ok(None)
};

let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc()
else {
let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};
Expand Down
9 changes: 2 additions & 7 deletions crates/consensus/beacon/src/engine/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus},
BeaconConsensusEngineEvent,
};
use crate::engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus};
use futures::{future::Either, FutureExt};
use reth_engine_primitives::EngineTypes;
use reth_interfaces::RethResult;
Expand All @@ -15,7 +12,7 @@ use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::oneshot;

/// Represents the outcome of forkchoice update.
///
Expand Down Expand Up @@ -162,6 +159,4 @@ pub enum BeaconEngineMessage<Engine: EngineTypes> {
},
/// Message with exchanged transition configuration.
TransitionConfigurationExchanged,
/// Add a new listener for [`BeaconEngineMessage`].
EventListener(UnboundedSender<BeaconConsensusEngineEvent>),
}
Loading