From cad33a1209534548fdb8f599b9b4e5681175e426 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 1 Jul 2024 16:32:56 +0200 Subject: [PATCH] chore: rename pipeline references to backfill sync --- crates/engine/tree/src/backfill.rs | 2 +- crates/engine/tree/src/chain.rs | 74 +++++++++++++++--------------- crates/engine/tree/src/download.rs | 8 ++-- crates/engine/tree/src/engine.rs | 6 +-- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/crates/engine/tree/src/backfill.rs b/crates/engine/tree/src/backfill.rs index ec0b4ef06cc..3060ddd1d4e 100644 --- a/crates/engine/tree/src/backfill.rs +++ b/crates/engine/tree/src/backfill.rs @@ -239,7 +239,7 @@ mod tests { .build(), ); - // force the pipeline to be "done" after 5 blocks + // force the pipeline to be "done" after `pipeline_done_after` blocks let pipeline = TestPipelineBuilder::new() .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)), diff --git a/crates/engine/tree/src/chain.rs b/crates/engine/tree/src/chain.rs index 97c6d615c11..e3f764beab6 100644 --- a/crates/engine/tree/src/chain.rs +++ b/crates/engine/tree/src/chain.rs @@ -13,12 +13,12 @@ use std::{ /// /// ## Control flow /// -/// The [`ChainOrchestrator`] is responsible for controlling the pipeline sync and additional hooks. +/// The [`ChainOrchestrator`] is responsible for controlling the backfill sync and additional hooks. /// It polls the given `handler`, which is responsible for advancing the chain, how is up to the /// handler. However, due to database restrictions (e.g. exclusive write access), following /// invariants apply: /// - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must -/// ensure that while the pipeline is running, no other write access is granted. +/// ensure that while the backfill sync is running, no other write access is granted. /// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database /// (e.g. if pruning is required), but will not do so until the handler has acknowledged the /// request for write access. @@ -35,8 +35,8 @@ where { /// The handler for advancing the chain. handler: T, - /// Controls pipeline sync. - pipeline: P, + /// Controls backfill sync. + backfill_sync: P, } impl ChainOrchestrator @@ -44,9 +44,9 @@ where T: ChainHandler + Unpin, P: BackfillSync + Unpin, { - /// Creates a new [`ChainOrchestrator`] with the given handler and pipeline. - pub const fn new(handler: T, pipeline: P) -> Self { - Self { handler, pipeline } + /// Creates a new [`ChainOrchestrator`] with the given handler and backfill sync. + pub const fn new(handler: T, backfill_sync: P) -> Self { + Self { handler, backfill_sync } } /// Returns the handler @@ -68,34 +68,34 @@ where // This loop polls the components // - // 1. Polls the pipeline to completion, if active. + // 1. Polls the backfill sync to completion, if active. // 2. Advances the chain by polling the handler. 'outer: loop { - // try to poll the pipeline to completion, if active - match this.pipeline.poll(cx) { - Poll::Ready(pipeline_event) => match pipeline_event { + // try to poll the backfill sync to completion, if active + match this.backfill_sync.poll(cx) { + Poll::Ready(backfill_sync_event) => match backfill_sync_event { BackfillEvent::Idle => {} BackfillEvent::Started(_) => { - // notify handler that pipeline started - this.handler.on_event(FromOrchestrator::PipelineStarted); - return Poll::Ready(ChainEvent::PipelineStarted); + // notify handler that backfill sync started + this.handler.on_event(FromOrchestrator::BackfillSyncStarted); + return Poll::Ready(ChainEvent::BackfillSyncStarted); } BackfillEvent::Finished(res) => { return match res { Ok(event) => { - tracing::debug!(?event, "pipeline finished"); - // notify handler that pipeline finished - this.handler.on_event(FromOrchestrator::PipelineFinished); - Poll::Ready(ChainEvent::PipelineFinished) + tracing::debug!(?event, "backfill sync finished"); + // notify handler that backfill sync finished + this.handler.on_event(FromOrchestrator::BackfillSyncFinished); + Poll::Ready(ChainEvent::BackfillSyncFinished) } Err(err) => { - tracing::error!( %err, "pipeline failed"); + tracing::error!( %err, "backfill sync failed"); Poll::Ready(ChainEvent::FatalError) } } } BackfillEvent::TaskDropped(err) => { - tracing::error!( %err, "pipeline task dropped"); + tracing::error!( %err, "backfill sync task dropped"); return Poll::Ready(ChainEvent::FatalError); } }, @@ -106,9 +106,9 @@ where match this.handler.poll(cx) { Poll::Ready(handler_event) => { match handler_event { - HandlerEvent::Pipeline(target) => { - // trigger pipeline and start polling it - this.pipeline.on_action(BackfillAction::Start(target)); + HandlerEvent::BackfillSync(target) => { + // trigger backfill sync and start polling it + this.backfill_sync.on_action(BackfillAction::Start(target)); continue 'outer } HandlerEvent::Event(ev) => { @@ -153,10 +153,10 @@ enum SyncMode { /// These are meant to be used for observability and debugging purposes. #[derive(Debug)] pub enum ChainEvent { - /// Pipeline sync started - PipelineStarted, - /// Pipeline sync finished - PipelineFinished, + /// Backfill sync started + BackfillSyncStarted, + /// Backfill sync finished + BackfillSyncFinished, /// Fatal error FatalError, /// Event emitted by the handler @@ -180,8 +180,8 @@ pub trait ChainHandler: Send + Sync { /// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`]. #[derive(Clone, Debug)] pub enum HandlerEvent { - /// Request to start a pipeline sync - Pipeline(PipelineTarget), + /// Request to start a backfill sync + BackfillSync(PipelineTarget), /// Other event emitted by the handler Event(T), } @@ -189,26 +189,26 @@ pub enum HandlerEvent { /// Internal events issued by the [`ChainOrchestrator`]. #[derive(Clone, Debug)] pub enum FromOrchestrator { - /// Invoked when pipeline sync finished - PipelineFinished, - /// Invoked when pipeline started - PipelineStarted, + /// Invoked when backfill sync finished + BackfillSyncFinished, + /// Invoked when backfill sync started + BackfillSyncStarted, } /// Represents the state of the chain. #[derive(Clone, Copy, PartialEq, Eq, Default, Debug)] pub enum OrchestratorState { /// Orchestrator has exclusive write access to the database. - PipelineActive, + BackfillSyncActive, /// Node is actively processing the chain. #[default] Idle, } impl OrchestratorState { - /// Returns `true` if the state is [`OrchestratorState::PipelineActive`]. - pub const fn is_pipeline_active(&self) -> bool { - matches!(self, Self::PipelineActive) + /// Returns `true` if the state is [`OrchestratorState::BackfillSyncActive`]. + pub const fn is_backfill_sync_active(&self) -> bool { + matches!(self, Self::BackfillSyncActive) } /// Returns `true` if the state is [`OrchestratorState::Idle`]. diff --git a/crates/engine/tree/src/download.rs b/crates/engine/tree/src/download.rs index 12b2bd18928..b8ebb8415c8 100644 --- a/crates/engine/tree/src/download.rs +++ b/crates/engine/tree/src/download.rs @@ -20,7 +20,7 @@ use tracing::trace; /// A trait that can download blocks on demand. pub trait BlockDownloader: Send + Sync { /// Handle an action. - fn on_action(&mut self, event: DownloadAction); + fn on_action(&mut self, action: DownloadAction); /// Advance in progress requests if any fn poll(&mut self, cx: &mut Context<'_>) -> Poll; @@ -65,7 +65,7 @@ where Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, { /// Create a new instance - pub(crate) fn new(client: Client, consensus: Arc) -> Self { + pub fn new(client: Client, consensus: Arc) -> Self { Self { full_block_client: FullBlockClient::new(client, consensus), inflight_full_block_requests: Vec::new(), @@ -154,8 +154,8 @@ where Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, { /// Handles incoming download actions. - fn on_action(&mut self, event: DownloadAction) { - match event { + fn on_action(&mut self, action: DownloadAction) { + match action { DownloadAction::Clear => self.clear(), DownloadAction::Download(request) => self.download(request), } diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index d77162e4cd0..2010c3768a4 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -72,10 +72,10 @@ where RequestHandlerEvent::Idle => break, RequestHandlerEvent::HandlerEvent(ev) => { return match ev { - HandlerEvent::Pipeline(target) => { - // bubble up pipeline request + HandlerEvent::BackfillSync(target) => { + // bubble up backfill sync request request self.downloader.on_action(DownloadAction::Clear); - Poll::Ready(HandlerEvent::Pipeline(target)) + Poll::Ready(HandlerEvent::BackfillSync(target)) } HandlerEvent::Event(ev) => { // bubble up the event