Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/consensus/beacon/src/engine/invalid_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub struct InvalidHeaderCache {
}

impl InvalidHeaderCache {
pub(crate) fn new(max_length: u32) -> Self {
/// Invalid header cache constructor.
pub fn new(max_length: u32) -> Self {
Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() }
}

Expand Down
9 changes: 6 additions & 3 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
chain::{ChainHandler, FromOrchestrator, HandlerEvent},
download::{BlockDownloader, DownloadAction, DownloadOutcome},
tree::TreeEvent,
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
Expand Down Expand Up @@ -150,7 +151,6 @@ pub struct EngineApiRequestHandler<T: EngineTypes> {
to_tree: Sender<FromEngine<BeaconEngineMessage<T>>>,
/// channel to receive messages from the tree.
from_tree: UnboundedReceiver<EngineApiEvent>,
// TODO add db controller
}

impl<T> EngineApiRequestHandler<T>
Expand Down Expand Up @@ -178,13 +178,16 @@ where
}

fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
todo!("poll tree and handle db")
todo!("poll tree")
}
}

/// Events emitted by the engine API handler.
#[derive(Debug)]
pub enum EngineApiEvent {}
pub enum EngineApiEvent {
/// Bubbled from tree.
FromTree(TreeEvent),
}

#[derive(Debug)]
pub enum FromEngine<Req> {
Expand Down
135 changes: 123 additions & 12 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::{backfill::BackfillAction, engine::DownloadRequest};
use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated};
use crate::{
backfill::BackfillAction,
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, FromEngine},
};
use reth_beacon_consensus::{
BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated,
};
use reth_blockchain_tree::{
error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus,
};
use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
use reth_errors::{ConsensusError, ProviderResult, RethError};
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_primitives::PayloadTypes;
use reth_payload_validator::ExecutionPayloadValidator;
Expand All @@ -27,8 +33,9 @@ use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
sync::Arc,
sync::{mpsc::Receiver, Arc},
};
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;

mod memory_overlay;
Expand Down Expand Up @@ -72,7 +79,7 @@ impl ExecutedBlock {
}

/// Keeps track of the state of the tree.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct TreeState {
/// All executed blocks by hash.
blocks_by_hash: HashMap<B256, ExecutedBlock>,
Expand Down Expand Up @@ -129,11 +136,22 @@ pub struct EngineApiTreeState {
invalid_headers: InvalidHeaderCache,
}

impl EngineApiTreeState {
fn new(block_buffer_limit: u32, max_invalid_header_cache_length: u32) -> Self {
Self {
invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
buffer: BlockBuffer::new(block_buffer_limit),
tree_state: TreeState::default(),
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
}
}
}

/// The type responsible for processing engine API requests.
///
/// TODO: design: should the engine handler functions also accept the response channel or return the
/// result and the caller redirects the response
pub trait EngineApiTreeHandler: Send + Sync {
pub trait EngineApiTreeHandler {
/// The engine type that this handler is for.
type Engine: EngineTypes;

Expand Down Expand Up @@ -170,7 +188,7 @@ pub trait EngineApiTreeHandler: Send + Sync {
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>>;
) -> TreeOutcome<Result<OnForkChoiceUpdated, RethError>>;
}

/// The outcome of a tree operation.
Expand Down Expand Up @@ -220,17 +238,110 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
state: EngineApiTreeState,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
/// (tmp) The flag indicating whether the pipeline is active.
is_pipeline_active: bool,
_marker: PhantomData<T>,
}

impl<P, E, T> EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory,
P: BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
T: EngineTypes + 'static,
{
#[allow(clippy::too_many_arguments)]
fn new(
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
) -> Self {
Self {
provider,
executor_provider,
consensus,
payload_validator,
incoming,
outgoing,
is_pipeline_active: false,
state,
_marker: PhantomData,
}
}

#[allow(clippy::too_many_arguments)]
fn spawn_new(
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
state: EngineApiTreeState,
) -> UnboundedSender<EngineApiEvent> {
let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel();
let task = Self::new(
provider,
executor_provider,
consensus,
payload_validator,
incoming,
outgoing.clone(),
state,
);
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
outgoing
}

fn run(mut self) {
loop {
while let Ok(msg) = self.incoming.recv() {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncFinished => {
todo!()
}
FromOrchestrator::BackfillSyncStarted => {
todo!()
}
},
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let output = self.on_forkchoice_updated(state, payload_attrs);
if let Err(err) = tx.send(output.outcome) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(
e,
))
})) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
todo!()
}
},
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) {
error!("Failed to send event: {err:?}");
}
}
}
}
}
}
}

/// Return block from database or in-memory state by hash.
fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>> {
// check database first
Expand Down Expand Up @@ -463,9 +574,9 @@ where

impl<P, E, T> EngineApiTreeHandler for EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory + Clone,
P: BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
T: EngineTypes + 'static,
{
type Engine = T;

Expand Down Expand Up @@ -588,7 +699,7 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>> {
) -> TreeOutcome<Result<OnForkChoiceUpdated, RethError>> {
todo!()
}
}