From 3a080f528fb4e5db15c48d9024552fc9181cac52 Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Thu, 29 Sep 2022 15:27:23 +0200 Subject: [PATCH 1/7] feat: basic staged sync crate --- crates/staged-sync/Cargo.toml | 14 ++++ crates/staged-sync/src/lib.rs | 110 +++++++++++++++++++++++++++++ crates/staged-sync/src/pipeline.rs | 98 +++++++++++++++++++++++++ 3 files changed, 222 insertions(+) create mode 100644 crates/staged-sync/Cargo.toml create mode 100644 crates/staged-sync/src/lib.rs create mode 100644 crates/staged-sync/src/pipeline.rs diff --git a/crates/staged-sync/Cargo.toml b/crates/staged-sync/Cargo.toml new file mode 100644 index 00000000000..0064440632d --- /dev/null +++ b/crates/staged-sync/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "reth-staged-sync" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = "Staged syncing primitives used in reth." + +[dependencies] +reth-primitives = { path = "../primitives" } + +eyre = "0.6.8" +async-trait = "0.1.57" \ No newline at end of file diff --git a/crates/staged-sync/src/lib.rs b/crates/staged-sync/src/lib.rs new file mode 100644 index 00000000000..cb16c637c5b --- /dev/null +++ b/crates/staged-sync/src/lib.rs @@ -0,0 +1,110 @@ +#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] +//! Staged syncing primitives for reth. +//! +//! See [Stage] and [Pipeline]. + +use async_trait::async_trait; +use reth_primitives::U64; + +mod pipeline; +pub use pipeline::*; + +/// Stage execution input, see [Stage::execute]. +#[derive(Clone, Copy, Debug)] +pub struct ExecInput { + /// The stage that was run before the current stage and the block number it reached. + pub previous_stage: Option<(StageId, U64)>, + /// The progress of this stage the last time it was executed. + pub stage_progress: Option, +} + +/// Stage unwind input, see [Stage::unwind]. +#[derive(Clone, Copy, Debug)] +pub struct UnwindInput { + /// The current highest block of the stage. + pub stage_progress: U64, + /// The block to unwind to. + pub unwind_to: U64, + /// The bad block that caused the unwind, if any. + pub bad_block: Option, +} + +/// The output of a stage execution. +#[derive(Debug, PartialEq, Eq)] +pub struct ExecOutput { + /// How far the stage got. + pub stage_progress: U64, + /// Whether or not the stage is done. + pub done: bool, + /// Whether or not the stage reached the tip of the chain. + pub reached_tip: bool, +} + +/// The output of a stage unwinding. +#[derive(Debug, PartialEq, Eq)] +pub struct UnwindOutput { + /// The block at which the stage has unwound to. + pub stage_progress: U64, +} + +/// A stage execution error. +#[derive(Debug)] +pub enum StageError { + /// The stage encountered a state validation error. + /// + /// TODO: This depends on the consensus engine and should include the validation failure reason + Validation, + /// The stage encountered an internal error. + Internal(eyre::Error), +} + +impl From for StageError { + fn from(err: eyre::Error) -> Self { + StageError::Internal(err) + } +} + +/// The ID of a stage. +/// +/// Each stage ID must be unique. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct StageId(pub &'static str); + +/// A stage is a segmented part of the syncing process of the node. +/// +/// Each stage takes care of a well-defined task, such as downloading headers or executing +/// transactions, and persist their results to a database. +/// +/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards" +/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]). +/// +/// Stages are executed as part of a pipeline where they are executed serially. +#[async_trait] +pub trait Stage { + /// Get the ID of the stage. + /// + /// Stage IDs must be unique. + fn id(&self) -> StageId; + + /// Execute the stage. + async fn execute( + &mut self, + tx: &mut dyn Transaction, + input: ExecInput, + ) -> Result; + + /// Unwind the stage. + async fn unwind( + &mut self, + tx: &mut dyn Transaction, + input: UnwindInput, + ) -> eyre::Result; +} + +/// TODO: Stand-in for database-related abstractions. +pub trait Transaction {} diff --git a/crates/staged-sync/src/pipeline.rs b/crates/staged-sync/src/pipeline.rs new file mode 100644 index 00000000000..284fab88b41 --- /dev/null +++ b/crates/staged-sync/src/pipeline.rs @@ -0,0 +1,98 @@ +use crate::Stage; +use reth_primitives::U64; +use std::fmt::{Debug, Formatter}; + +#[allow(dead_code)] +struct QueuedStage { + stage: Box, + unwind_priority: usize, + require_tip: bool, +} + +/// A staged sync pipeline. +/// +/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip +/// of the chain and the pipeline then executes each stage in order from the current local chain tip +/// and the external chain tip. When a stage is executed, it will run until it reaches the chain +/// tip. +/// +/// After the entire pipeline has been run, it will run again unless asked to stop (see +/// [Pipeline::set_exit_after_sync]). +/// +/// # Unwinding +/// +/// In case of a validation error (as determined by the consensus engine) in one of the stages, the +/// pipeline will unwind the stages according to their unwind priority. It is also possible to +/// request an unwind manually (see [Pipeline::start_with_unwind]). +/// +/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. +#[derive(Default)] +pub struct Pipeline { + stages: Vec, + unwind_to: Option, + max_block: Option, + exit_after_sync: bool, +} + +impl Debug for Pipeline { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Pipeline") + .field("unwind_to", &self.unwind_to) + .field("max_block", &self.max_block) + .field("exit_after_sync", &self.exit_after_sync) + .finish() + } +} + +impl Pipeline { + /// Add a stage to the pipeline. + /// + /// # Unwinding + /// + /// The unwind priority is set to 0. + pub fn push(&mut self, stage: S, require_tip: bool) -> &mut Self + where + S: Stage + 'static, + { + self.push_with_unwind_priority(stage, require_tip, 0) + } + + /// Add a stage to the pipeline, specifying the unwind priority. + pub fn push_with_unwind_priority( + &mut self, + stage: S, + require_tip: bool, + unwind_priority: usize, + ) -> &mut Self + where + S: Stage + 'static, + { + self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority }); + self + } + + /// Set the target block. + /// + /// Once this block is reached, syncing will stop. + pub fn set_max_block(&mut self, block: Option) -> &mut Self { + self.max_block = block; + self + } + + /// Start the pipeline by unwinding to the specified block. + pub fn start_with_unwind(&mut self, unwind_to: Option) -> &mut Self { + self.unwind_to = unwind_to; + self + } + + /// Control whether the pipeline should exit after syncing. + pub fn set_exit_after_sync(&mut self, exit: bool) -> &mut Self { + self.exit_after_sync = exit; + self + } + + /// Run the pipeline. + pub async fn run(&mut self) -> eyre::Result<()> { + todo!() + } +} From 176e446831ac9c5c3b5eb9741918eb4313fa1175 Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Fri, 30 Sep 2022 13:35:57 +0200 Subject: [PATCH 2/7] refactor: rename to `reth-stages` --- crates/{staged-sync => stages}/Cargo.toml | 2 +- crates/{staged-sync => stages}/src/lib.rs | 0 crates/{staged-sync => stages}/src/pipeline.rs | 0 3 files changed, 1 insertion(+), 1 deletion(-) rename crates/{staged-sync => stages}/Cargo.toml (92%) rename crates/{staged-sync => stages}/src/lib.rs (100%) rename crates/{staged-sync => stages}/src/pipeline.rs (100%) diff --git a/crates/staged-sync/Cargo.toml b/crates/stages/Cargo.toml similarity index 92% rename from crates/staged-sync/Cargo.toml rename to crates/stages/Cargo.toml index 0064440632d..ec66e446f57 100644 --- a/crates/staged-sync/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "reth-staged-sync" +name = "reth-stages" version = "0.1.0" edition = "2021" license = "MIT OR Apache-2.0" diff --git a/crates/staged-sync/src/lib.rs b/crates/stages/src/lib.rs similarity index 100% rename from crates/staged-sync/src/lib.rs rename to crates/stages/src/lib.rs diff --git a/crates/staged-sync/src/pipeline.rs b/crates/stages/src/pipeline.rs similarity index 100% rename from crates/staged-sync/src/pipeline.rs rename to crates/stages/src/pipeline.rs From 2d7cabfc09304e21b136ed26d3c0f7bd69136a66 Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Fri, 30 Sep 2022 13:38:05 +0200 Subject: [PATCH 3/7] feat: remove eyre --- crates/stages/Cargo.toml | 1 - crates/stages/src/lib.rs | 10 ++-------- crates/stages/src/pipeline.rs | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index ec66e446f57..bfa5c6f0a3f 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -10,5 +10,4 @@ description = "Staged syncing primitives used in reth." [dependencies] reth-primitives = { path = "../primitives" } -eyre = "0.6.8" async-trait = "0.1.57" \ No newline at end of file diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index cb16c637c5b..04e8fe78780 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -60,13 +60,7 @@ pub enum StageError { /// TODO: This depends on the consensus engine and should include the validation failure reason Validation, /// The stage encountered an internal error. - Internal(eyre::Error), -} - -impl From for StageError { - fn from(err: eyre::Error) -> Self { - StageError::Internal(err) - } + Internal(Box), } /// The ID of a stage. @@ -103,7 +97,7 @@ pub trait Stage { &mut self, tx: &mut dyn Transaction, input: UnwindInput, - ) -> eyre::Result; + ) -> Result>; } /// TODO: Stand-in for database-related abstractions. diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 284fab88b41..502203d2554 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -92,7 +92,7 @@ impl Pipeline { } /// Run the pipeline. - pub async fn run(&mut self) -> eyre::Result<()> { + pub async fn run(&mut self) -> Result<(), Box> { todo!() } } From 42969a20a80e060e598b15ad178058ca852bd2de Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Fri, 30 Sep 2022 13:39:32 +0200 Subject: [PATCH 4/7] feat: impl `Error` for `StageError` --- crates/stages/Cargo.toml | 3 ++- crates/stages/src/lib.rs | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index bfa5c6f0a3f..c3dca21b18d 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -10,4 +10,5 @@ description = "Staged syncing primitives used in reth." [dependencies] reth-primitives = { path = "../primitives" } -async-trait = "0.1.57" \ No newline at end of file +async-trait = "0.1.57" +thiserror = "1.0.37" \ No newline at end of file diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 04e8fe78780..cf94e12c3f0 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -10,6 +10,7 @@ use async_trait::async_trait; use reth_primitives::U64; +use thiserror::Error; mod pipeline; pub use pipeline::*; @@ -53,13 +54,15 @@ pub struct UnwindOutput { } /// A stage execution error. -#[derive(Debug)] +#[derive(Error, Debug)] pub enum StageError { /// The stage encountered a state validation error. /// /// TODO: This depends on the consensus engine and should include the validation failure reason + #[error("Stage encountered a validation error.")] Validation, /// The stage encountered an internal error. + #[error(transparent)] Internal(Box), } From 7e6f228cfd6eae7619a2002a485e6b00292f5d6e Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 30 Sep 2022 14:18:54 -0700 Subject: [PATCH 5/7] chore: bump cargo lock --- Cargo.lock | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee140b7b4a9..f02ec78af99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,9 +171,9 @@ dependencies = [ [[package]] name = "ecdsa" -version = "0.14.7" +version = "0.14.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85789ce7dfbd0f0624c07ef653a08bb2ebf43d3e16531361f46d36dd54334fed" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" dependencies = [ "der", "elliptic-curve", @@ -247,7 +247,7 @@ dependencies = [ [[package]] name = "ethers-core" version = "0.17.0" -source = "git+https://github.com/gakonst/ethers-rs#d8791482d566e2203ab6a178524f1ed6705fe274" +source = "git+https://github.com/gakonst/ethers-rs#b2fc9fdf50d6fe3e81de0ac5648a068425cf87a7" dependencies = [ "arrayvec", "bytes", @@ -422,9 +422,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" [[package]] name = "k256" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3636d281d46c3b64182eb3a0a42b7b483191a2ecc3f05301fa67403f7c9bc949" +checksum = "72c1e0b51e7ec0a97369623508396067a486bd0cbed95a2659a4b863d28cfc8b" dependencies = [ "cfg-if", "ecdsa", @@ -441,9 +441,9 @@ checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838" [[package]] name = "libc" -version = "0.2.133" +version = "0.2.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966" +checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb" [[package]] name = "memchr" @@ -568,9 +568,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.44" +version = "1.0.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58" +checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b" dependencies = [ "unicode-ident", ] @@ -856,18 +856,18 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "thiserror" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a99cb8c4b9a8ef0e7907cd3b617cc8dc04d571c4e73c8ae403d80ac160bb122" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a891860d3c8d66fec8e73ddb3765f90082374dbaaa833407b904a94f1a7eb43" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" dependencies = [ "proc-macro2", "quote", From b5e6a6015024838f7f61fc30b0a30ee6758e3eaf Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Mon, 3 Oct 2022 14:38:30 +0200 Subject: [PATCH 6/7] chore: nits --- Cargo.lock | 20 ++++++++++++++++++++ Cargo.toml | 3 ++- crates/stages/Cargo.toml | 1 - crates/stages/src/lib.rs | 6 +++--- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f02ec78af99..d8258af2556 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "async-trait" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "auto_impl" version = "1.0.1" @@ -656,6 +667,15 @@ dependencies = [ "ethers-core", ] +[[package]] +name = "reth-stages" +version = "0.1.0" +dependencies = [ + "async-trait", + "reth-primitives", + "thiserror", +] + [[package]] name = "rfc6979" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index b22c2a824b1..506f1e54dcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" members = [ "crate-template", "crates/primitives", - "crates/net/p2p" + "crates/net/p2p", + "crates/stages" ] [dependencies] diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index c3dca21b18d..e635d30fab9 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -9,6 +9,5 @@ description = "Staged syncing primitives used in reth." [dependencies] reth-primitives = { path = "../primitives" } - async-trait = "0.1.57" thiserror = "1.0.37" \ No newline at end of file diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index cf94e12c3f0..7edf87e0829 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -91,17 +91,17 @@ pub trait Stage { /// Execute the stage. async fn execute( &mut self, - tx: &mut dyn Transaction, + tx: &mut dyn DbTransaction, input: ExecInput, ) -> Result; /// Unwind the stage. async fn unwind( &mut self, - tx: &mut dyn Transaction, + tx: &mut dyn DbTransaction, input: UnwindInput, ) -> Result>; } /// TODO: Stand-in for database-related abstractions. -pub trait Transaction {} +pub trait DbTransaction {} From d76b36831cd4b882cabdbb4ca121f7138ad3e07b Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Mon, 3 Oct 2022 14:39:44 +0200 Subject: [PATCH 7/7] docs: clarify unwind --- crates/stages/src/pipeline.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 502203d2554..7fb874f1497 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -4,8 +4,12 @@ use std::fmt::{Debug, Formatter}; #[allow(dead_code)] struct QueuedStage { + /// The actual stage to execute. stage: Box, + /// The unwind priority of the stage. unwind_priority: usize, + /// Whether or not this stage can only execute when we reach what we believe to be the tip of + /// the chain. require_tip: bool, } @@ -25,7 +29,8 @@ struct QueuedStage { /// pipeline will unwind the stages according to their unwind priority. It is also possible to /// request an unwind manually (see [Pipeline::start_with_unwind]). /// -/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. +/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind +/// priorities are unwound first. #[derive(Default)] pub struct Pipeline { stages: Vec,