Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ crates/config/ @onbjerg
crates/consensus/ @rkrasiuk @mattsse @Rjected
crates/engine @rkrasiuk @mattsse @Rjected
crates/e2e-test-utils/ @mattsse @Rjected
crates/engine-primitives/ @rkrasiuk @mattsse @Rjected
crates/engine/ @rkrasiuk @mattsse @Rjected @fgimenez
crates/errors/ @mattsse
crates/ethereum/ @mattsse @Rjected
crates/ethereum-forks/ @mattsse @Rjected
Expand Down
3 changes: 3 additions & 0 deletions crates/consensus/beacon/src/engine/forkchoice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ impl From<PayloadStatusEnum> for ForkchoiceStatus {
/// A helper type to check represent hashes of a [`ForkchoiceState`]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ForkchoiceStateHash {
/// Head hash of the [`ForkchoiceState`].
Head(B256),
/// Safe hash of the [`ForkchoiceState`].
Safe(B256),
/// Finalized hash of the [`ForkchoiceState`].
Finalized(B256),
}

Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ mod tests {
checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
done: true,
})]))
.build(chain_spec.clone());
.build(chain_spec);

let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
let client = TestFullBlockClient::default();
Expand Down
35 changes: 16 additions & 19 deletions crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,27 +102,24 @@ where
Poll::Pending => {}
}

// drain the handler
loop {
// poll the handler for the next event
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(BackfillAction::Start(target));
continue 'outer
}
HandlerEvent::Event(ev) => {
// bubble up the event
return Poll::Ready(ChainEvent::Handler(ev));
}
// poll the handler for the next event
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(BackfillAction::Start(target));
continue 'outer
}
HandlerEvent::Event(ev) => {
// bubble up the event
return Poll::Ready(ChainEvent::Handler(ev));
}
}
Poll::Pending => {
// no more events to process
break 'outer
}
}
Poll::Pending => {
// no more events to process
break 'outer
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/tree/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub enum DownloadOutcome {
Blocks(Vec<SealedBlockWithSenders>),
}

/// Basic [BlockDownloader].
/// Basic [`BlockDownloader`].
pub struct BasicBlockDownloader<Client>
where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl From<OrderedSealedBlockWithSenders> for SealedBlockWithSenders {
}
}

/// A [BlockDownloader] that does nothing.
/// A [`BlockDownloader`] that does nothing.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct NoopBlockDownloader;
Expand Down
54 changes: 26 additions & 28 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
//! An engine API handler for the chain.

use crate::{
chain::{ChainHandler, FromOrchestrator, HandlerEvent, OrchestratorState},
chain::{ChainHandler, FromOrchestrator, HandlerEvent},
download::{BlockDownloader, DownloadAction, DownloadOutcome},
tree::{EngineApiTreeHandler, TreeEvent},
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256};
use reth_rpc_types::engine::{PayloadStatus, PayloadStatusEnum};
use std::{
collections::{HashSet, VecDeque},
collections::HashSet,
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tracing::trace;

/// Advances the chain based on incoming requests.
///
Expand Down Expand Up @@ -46,7 +43,7 @@ pub struct EngineHandler<T, S, D> {

impl<T, S, D> EngineHandler<T, S, D> {
/// Creates a new [`EngineHandler`] with the given handler and downloader.
pub fn new(handler: T, downloader: D, incoming_requests: S) -> Self
pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
where
T: EngineRequestHandler,
{
Expand All @@ -70,31 +67,26 @@ where
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
loop {
// drain the handler first
loop {
match self.handler.poll(cx) {
Poll::Ready(ev) => {
match ev {
RequestHandlerEvent::Idle => break,
RequestHandlerEvent::HandlerEvent(ev) => {
return match ev {
HandlerEvent::Pipeline(target) => {
// bubble up pipeline request
self.downloader.on_action(DownloadAction::Clear);
Poll::Ready(HandlerEvent::Pipeline(target))
}
HandlerEvent::Event(ev) => {
// bubble up the event
Poll::Ready(HandlerEvent::Event(ev))
}
}
while let Poll::Ready(ev) = self.handler.poll(cx) {
match ev {
RequestHandlerEvent::Idle => break,
RequestHandlerEvent::HandlerEvent(ev) => {
return match ev {
HandlerEvent::Pipeline(target) => {
// bubble up pipeline request
self.downloader.on_action(DownloadAction::Clear);
Poll::Ready(HandlerEvent::Pipeline(target))
}
RequestHandlerEvent::Download(req) => {
// delegate download request to the downloader
self.downloader.on_action(DownloadAction::Download(req));
HandlerEvent::Event(ev) => {
// bubble up the event
Poll::Ready(HandlerEvent::Event(ev))
}
}
}
Poll::Pending => break,
RequestHandlerEvent::Download(req) => {
// delegate download request to the downloader
self.downloader.on_action(DownloadAction::Download(req));
}
}
}

Expand Down Expand Up @@ -185,8 +177,11 @@ pub enum EngineApiEvent {}

#[derive(Debug)]
pub enum FromEngine<Req> {
/// Event from the top level orchestrator.
Event(FromOrchestrator),
/// Request from the engine
Request(Req),
/// Downloaded blocks from the network.
DownloadedBlocks(Vec<SealedBlockWithSenders>),
}

Expand All @@ -199,8 +194,11 @@ impl<Req> From<FromOrchestrator> for FromEngine<Req> {
/// Requests produced by a [`EngineRequestHandler`].
#[derive(Debug)]
pub enum RequestHandlerEvent<T> {
/// The handler is idle.
Idle,
/// An event emitted by the handler.
HandlerEvent(HandlerEvent<T>),
/// Request to download blocks.
Download(DownloadRequest),
}

Expand Down
1 change: 1 addition & 0 deletions crates/engine/tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
// #![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![allow(missing_docs, dead_code, missing_debug_implementations, unused_variables)] // TODO rm

/// Re-export of the blockchain tree API.
pub use reth_blockchain_tree_api::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/memory_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct MemoryOverlayStateProvider<H> {

impl<H> MemoryOverlayStateProvider<H> {
/// Create new memory overlay state provider.
pub fn new(in_memory: Vec<ExecutedBlock>, historical: H) -> Self {
pub const fn new(in_memory: Vec<ExecutedBlock>, historical: H) -> Self {
Self { in_memory, historical }
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct TreeOutcome<T> {

impl<T> TreeOutcome<T> {
/// Create new tree outcome.
pub fn new(outcome: T) -> Self {
pub const fn new(outcome: T) -> Self {
Self { outcome, event: None }
}

Expand Down Expand Up @@ -518,7 +518,7 @@ where
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
} else {
let mut latest_valid_hash = None;
let status = match self.insert_block_without_senders(block.clone()).unwrap() {
let status = match self.insert_block_without_senders(block).unwrap() {
InsertPayloadOk::Inserted(BlockStatus::Valid(_)) |
InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
latest_valid_hash = Some(block_hash);
Expand Down