Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ edition = "2021"
members = [
"crate-template",
"crates/primitives",
"crates/net/p2p"
"crates/net/p2p",
"crates/stages"
]

[dependencies]
Expand Down
13 changes: 13 additions & 0 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "reth-stages"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = "Staged syncing primitives used in reth."

[dependencies]
reth-primitives = { path = "../primitives" }
async-trait = "0.1.57"
thiserror = "1.0.37"
107 changes: 107 additions & 0 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Staged syncing primitives for reth.
//!
//! See [Stage] and [Pipeline].

use async_trait::async_trait;
use reth_primitives::U64;
use thiserror::Error;

mod pipeline;
pub use pipeline::*;

/// Stage execution input, see [Stage::execute].
#[derive(Clone, Copy, Debug)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, U64)>,
/// The progress of this stage the last time it was executed.
pub stage_progress: Option<U64>,
}

/// Stage unwind input, see [Stage::unwind].
#[derive(Clone, Copy, Debug)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: U64,
/// The block to unwind to.
pub unwind_to: U64,
/// The bad block that caused the unwind, if any.
pub bad_block: Option<U64>,
}

/// The output of a stage execution.
#[derive(Debug, PartialEq, Eq)]
pub struct ExecOutput {
/// How far the stage got.
pub stage_progress: U64,
/// Whether or not the stage is done.
pub done: bool,
/// Whether or not the stage reached the tip of the chain.
pub reached_tip: bool,
}

/// The output of a stage unwinding.
#[derive(Debug, PartialEq, Eq)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
pub stage_progress: U64,
}

/// A stage execution error.
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
///
/// TODO: This depends on the consensus engine and should include the validation failure reason
#[error("Stage encountered a validation error.")]
Validation,
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
}

/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StageId(pub &'static str);

/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
/// transactions, and persist their results to a database.
///
/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
///
/// Stages are executed as part of a pipeline where they are executed serially.
#[async_trait]
pub trait Stage {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;

/// Execute the stage.
async fn execute(
&mut self,
tx: &mut dyn DbTransaction,
input: ExecInput,
) -> Result<ExecOutput, StageError>;

/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut dyn DbTransaction,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}

/// TODO: Stand-in for database-related abstractions.
pub trait DbTransaction {}
103 changes: 103 additions & 0 deletions crates/stages/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use crate::Stage;
use reth_primitives::U64;
use std::fmt::{Debug, Formatter};

#[allow(dead_code)]
struct QueuedStage {
/// The actual stage to execute.
stage: Box<dyn Stage>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit sensitive of the dynamic dispatch here but maybe it's ok

/// The unwind priority of the stage.
unwind_priority: usize,
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
/// the chain.
require_tip: bool,
}

/// A staged sync pipeline.
///
/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
/// of the chain and the pipeline then executes each stage in order from the current local chain tip
/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
/// tip.
///
/// After the entire pipeline has been run, it will run again unless asked to stop (see
/// [Pipeline::set_exit_after_sync]).
///
/// # Unwinding
///
/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
/// pipeline will unwind the stages according to their unwind priority. It is also possible to
/// request an unwind manually (see [Pipeline::start_with_unwind]).
///
/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind
/// priorities are unwound first.
#[derive(Default)]
pub struct Pipeline {
stages: Vec<QueuedStage>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use VecDeque here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think that would make sense since we never remove stages from here, and we only access in either a forwards fashion or sorted by unwind priority

unwind_to: Option<U64>,
max_block: Option<U64>,
exit_after_sync: bool,
}

impl Debug for Pipeline {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("unwind_to", &self.unwind_to)
.field("max_block", &self.max_block)
.field("exit_after_sync", &self.exit_after_sync)
.finish()
}
}

impl Pipeline {
/// Add a stage to the pipeline.
///
/// # Unwinding
///
/// The unwind priority is set to 0.
pub fn push<S>(&mut self, stage: S, require_tip: bool) -> &mut Self
where
S: Stage + 'static,
{
self.push_with_unwind_priority(stage, require_tip, 0)
}

/// Add a stage to the pipeline, specifying the unwind priority.
pub fn push_with_unwind_priority<S>(
&mut self,
stage: S,
require_tip: bool,
unwind_priority: usize,
) -> &mut Self
where
S: Stage + 'static,
{
self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority });
self
}

/// Set the target block.
///
/// Once this block is reached, syncing will stop.
pub fn set_max_block(&mut self, block: Option<U64>) -> &mut Self {
self.max_block = block;
self
}

/// Start the pipeline by unwinding to the specified block.
pub fn start_with_unwind(&mut self, unwind_to: Option<U64>) -> &mut Self {
Comment on lines 87 to 88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does unwinding mean: I am at block 10 and unwind_to is Some(5), it will undo blocks 10, 9 ... 5 in that order? It's like canceling each stage and undoing its progress? Do we understand when this happens?

Copy link
Collaborator Author

@onbjerg onbjerg Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is correct. It happens whenever a stage encounters some sort of validation failure (e.g. invalid hash, we're on the wrong fork, state is invalid etc etc), so essentially any time ExecOutput::Unwind is returned

self.unwind_to = unwind_to;
self
}

/// Control whether the pipeline should exit after syncing.
pub fn set_exit_after_sync(&mut self, exit: bool) -> &mut Self {
self.exit_after_sync = exit;
self
}

/// Run the pipeline.
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
todo!()
}
}