Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,14 @@ description = "Staged syncing primitives used in reth."

[dependencies]
reth-primitives = { path = "../primitives" }
reth-db = { path = "../db" }
async-trait = "0.1.57"
thiserror = "1.0.37"
thiserror = "1.0.37"
tracing = "0.1.36"
tracing-futures = "0.2.5"
tokio = { version = "1.21.2", features = ["sync"] }

[dev-dependencies]
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"
tempfile = "3.3.0"
92 changes: 71 additions & 21 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
//! See [Stage] and [Pipeline].

use async_trait::async_trait;
use reth_db::mdbx;
use reth_primitives::U64;
use std::fmt::Display;
use thiserror::Error;

mod pipeline;
pub use pipeline::*;

/// Stage execution input, see [Stage::execute].
#[derive(Clone, Copy, Debug)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct ExecInput {
/// The stage that was run before the current stage and the block number it reached.
pub previous_stage: Option<(StageId, U64)>,
Expand All @@ -25,7 +27,7 @@ pub struct ExecInput {
}

/// Stage unwind input, see [Stage::unwind].
#[derive(Clone, Copy, Debug)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct UnwindInput {
/// The current highest block of the stage.
pub stage_progress: U64,
Expand All @@ -36,7 +38,7 @@ pub struct UnwindInput {
}

/// The output of a stage execution.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ExecOutput {
/// How far the stage got.
pub stage_progress: U64,
Expand All @@ -47,7 +49,7 @@ pub struct ExecOutput {
}

/// The output of a stage unwinding.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct UnwindOutput {
/// The block at which the stage has unwound to.
pub stage_progress: U64,
Expand All @@ -59,19 +61,16 @@ pub enum StageError {
/// The stage encountered a state validation error.
///
/// TODO: This depends on the consensus engine and should include the validation failure reason
#[error("Stage encountered a validation error.")]
Validation,
#[error("Stage encountered a validation error in block {block}.")]
Validation {
/// The block that failed validation.
block: U64,
},
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
}

/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StageId(pub &'static str);

/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
Expand All @@ -82,26 +81,77 @@ pub struct StageId(pub &'static str);
///
/// Stages are executed as part of a pipeline where they are executed serially.
#[async_trait]
pub trait Stage {
pub trait Stage<'db, E>: Send + Sync
where
E: mdbx::EnvironmentKind,
{
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;

/// Execute the stage.
async fn execute(
async fn execute<'tx>(
&mut self,
tx: &mut dyn DbTransaction,
tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
) -> Result<ExecOutput, StageError>
where
'db: 'tx;

/// Unwind the stage.
async fn unwind(
async fn unwind<'tx>(
&mut self,
tx: &mut dyn DbTransaction,
tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>
where
'db: 'tx;
}

/// TODO: Stand-in for database-related abstractions.
pub trait DbTransaction {}
/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StageId(pub &'static str);

impl Display for StageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl StageId {
/// Get the last committed progress of this stage.
pub fn get_progress<'db, K, E>(
&self,
tx: &mdbx::Transaction<'db, K, E>,
) -> Result<Option<U64>, mdbx::Error>
where
K: mdbx::TransactionKind,
E: mdbx::EnvironmentKind,
{
// TODO: Clean up when we get better database abstractions
let bytes: Option<Vec<u8>> = tx.get(&tx.open_db(Some("SyncStage"))?, self.0.as_ref())?;

Ok(bytes.map(|b| U64::from_big_endian(b.as_ref())))
}

/// Save the progress of this stage.
pub fn save_progress<'db, E>(
&self,
tx: &mdbx::Transaction<'db, mdbx::RW, E>,
block: U64,
) -> Result<(), mdbx::Error>
where
E: mdbx::EnvironmentKind,
{
// TODO: Clean up when we get better database abstractions
tx.put(
&tx.open_db(Some("SyncStage"))?,
self.0,
block.0[0].to_be_bytes(),
mdbx::WriteFlags::UPSERT,
)
}
}
Loading