Skip to content

Commit

Permalink
feat: pipeline builder (paradigmxyz#1017)
Browse files Browse the repository at this point in the history
Co-authored-by: Georgios Konstantopoulos <[email protected]>
  • Loading branch information
2 people authored and literallymarvellous committed Feb 6, 2023
1 parent 0799c5c commit c50f4fd
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 81 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 32 additions & 23 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!());

// Raise the fd limit of the process.
// Does not do anything on windows.
raise_fd_limit();
Expand Down Expand Up @@ -190,47 +188,58 @@ impl Command {
)
}

async fn build_pipeline(
&self,
config: &Config,
network: &NetworkHandle,
consensus: &Arc<dyn Consensus>,
db: &Arc<Env<WriteMap>>,
) -> eyre::Result<Pipeline<Env<WriteMap>, NetworkHandle>> {
let fetch_client = Arc::new(network.fetch_client().await?);

let header_downloader = self.spawn_headers_downloader(config, consensus, &fetch_client);
let body_downloader = self.spawn_bodies_downloader(config, consensus, &fetch_client, db);
let stage_conf = &config.stages;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");

let mut builder = Pipeline::builder();

if let Some(max_block) = self.max_block {
builder = builder.with_max_block(max_block)
}
// Spawn headers downloader
let header_downloader = headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
);

// Spawn bodies downloader
let body_downloader = bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(config.stages.bodies.downloader_max_buffered_responses)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client.clone(), consensus.clone(), db.clone()),
);

let pipeline = builder
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(network.clone())
.add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage {
commit_threshold: stage_conf.total_difficulty.commit_threshold,
commit_threshold: config.stages.total_difficulty.commit_threshold,
},
),
)
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage {
batch_size: stage_conf.sender_recovery.batch_size,
commit_threshold: stage_conf.sender_recovery.commit_threshold,
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.execution.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: stage_conf.execution.commit_threshold,
chain_spec: self.chain,
commit_threshold: config.stages.execution.commit_threshold,
}),
)
.build();

tokio::spawn(handle_events(stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
)));

Ok(pipeline)
}

Expand Down
12 changes: 12 additions & 0 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ metrics = "0.20.1"
# misc
serde = { version = "1.0", optional = true }
thiserror = "1.0.37"
<<<<<<< HEAD
aquamarine = "0.2.1"
itertools = "0.10.5"
rayon = "1.6.0"
Expand All @@ -47,6 +48,11 @@ hasher = "0.1.4"
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
proptest = { version = "1.0", optional = true }

=======
aquamarine = "0.1.12"
itertools = "0.10.5"
rayon = "1.6.0"
>>>>>>> c0564c9e (feat: pipeline builder (#1017))
[dev-dependencies]
# reth
reth-primitives = { path = "../primitives", features = ["arbitrary"]}
Expand All @@ -60,6 +66,7 @@ assert_matches = "1.5.0"
rand = "0.8.5"
paste = "1.0"

<<<<<<< HEAD
# Stage benchmarks
criterion = { version = "0.4.0", features = ["async_futures"] }
proptest = { version = "1.0" }
Expand All @@ -79,3 +86,8 @@ test-utils = []
name = "criterion"
harness = false
required-features = ["test-utils"]
=======
[features]
default = ["serde"]
serde = ["dep:serde"]
>>>>>>> c0564c9e (feat: pipeline builder (#1017))
11 changes: 7 additions & 4 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@
//! # use std::sync::Arc;
//! # use reth_db::mdbx::test_utils::create_test_rw_db;
//! # use reth_db::mdbx::{Env, WriteMap};
//! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder;
//! # use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
//! # use reth_downloaders::headers::linear::LinearDownloadBuilder;
//! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient};
//! # use reth_primitives::PeerId;
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::DefaultStages;
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
//! # let headers_downloader = ReverseHeadersDownloaderBuilder::default().build(
//! # let headers_downloader = LinearDownloadBuilder::default().build(
//! # consensus.clone(),
//! # Arc::new(TestHeadersClient::default())
//! # );
//! # let bodies_downloader = BodiesDownloaderBuilder::default().build(
//! # let bodies_downloader = ConcurrentDownloaderBuilder::default().build(
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }),
//! # consensus.clone(),
//! # create_test_rw_db()
Expand Down Expand Up @@ -64,6 +64,9 @@ pub mod test_utils;
/// A re-export of common structs and traits.
pub mod prelude;

/// A re-export of common structs and traits.
pub mod prelude;

/// Implementations of stages.
pub mod stages;

Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ where
/// Stages can be grouped into a set by using a [`StageSet`].
///
/// To customize the stages in the set (reorder, disable, insert a stage) call
/// [`builder`][StageSet::builder] on the set which will convert it to a
/// [`build`][StageSet::build] on the set which will convert it to a
/// [`StageSetBuilder`][crate::StageSetBuilder].
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/pipeline/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
/// Disables the given stage.
///
/// The disabled [`Stage`] keeps its place in the set, so it can be used for ordering with
/// [`StageSetBuilder::add_before`] or [`StageSetBuilder::add_after`], or it can be re-enabled.
/// [`add_before`] or [`add_after`], or it can be re-enabled.
///
/// All stages within a [`StageSet`] are enabled by default.
///
Expand Down
6 changes: 3 additions & 3 deletions crates/stages/src/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ where
.add_stage(HeaderStage::new(self.header_downloader, self.consensus.clone()))
.add_stage(TotalDifficultyStage::default())
.add_stage(BodyStage { downloader: self.body_downloader, consensus: self.consensus })
.add_stage(TransactionLookupStage::default())
}
}

Expand Down Expand Up @@ -164,10 +165,10 @@ pub struct HashingStages;
impl<DB: Database> StageSet<DB> for HashingStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(MerkleStage::default_unwind())
.add_stage(MerkleStage::Unwind)
.add_stage(AccountHashingStage::default())
.add_stage(StorageHashingStage::default())
.add_stage(MerkleStage::default_execution())
.add_stage(MerkleStage::Execution)
}
}

Expand All @@ -179,7 +180,6 @@ pub struct HistoryIndexingStages;
impl<DB: Database> StageSet<DB> for HistoryIndexingStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(TransactionLookupStage::default())
.add_stage(IndexStorageHistoryStage::default())
.add_stage(IndexAccountHistoryStage::default())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use tracing::*;

/// The [`StageId`] of the account hashing stage.
pub const ACCOUNT_HASHING: StageId = StageId("AccountHashing");
pub const ACCOUNT_HASHING: StageId = StageId("AccountHashingStage");

/// Account hashing stage hashes plain account.
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{collections::BTreeMap, fmt::Debug};
use tracing::*;

/// The [`StageId`] of the storage hashing stage.
pub const STORAGE_HASHING: StageId = StageId("StorageHashing");
pub const STORAGE_HASHING: StageId = StageId("StorageHashingStage");

/// Storage hashing stage hashes plain storage.
/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
Expand Down
57 changes: 11 additions & 46 deletions crates/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use std::fmt::Debug;
use tracing::*;

/// The [`StageId`] of the merkle hashing execution stage.
pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecute");
pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecuteStage");

/// The [`StageId`] of the merkle hashing unwind stage.
pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind");
pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwindStage");

/// The merkle hashing stage uses input from
/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
Expand All @@ -36,40 +36,19 @@ pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind");
/// - [`MerkleStage::Execution`]
#[derive(Debug)]
pub enum MerkleStage {
/// The execution portion of the merkle stage.
Execution {
/// The threshold for switching from incremental trie building
/// of changes to whole rebuild. Num of transitions.
clean_threshold: u64,
},
/// The unwind portion of the merkle stage.
/// The execution portion of the hashing stage.
Execution,
/// The unwind portion of the hasing stage.
Unwind,

#[cfg(test)]
Both { clean_threshold: u64 },
}

impl MerkleStage {
/// Stage default for the Execution variant.
pub fn default_execution() -> Self {
Self::Execution { clean_threshold: 5_000 }
}

/// Stage default for the Unwind variant.
pub fn default_unwind() -> Self {
Self::Unwind
}
}

#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for MerkleStage {
/// Return the id of the stage
fn id(&self) -> StageId {
match self {
MerkleStage::Execution { .. } => MERKLE_EXECUTION,
MerkleStage::Execution => MERKLE_EXECUTION,
MerkleStage::Unwind => MERKLE_UNWIND,
#[cfg(test)]
MerkleStage::Both { .. } => unreachable!(),
}
}

Expand All @@ -79,24 +58,10 @@ impl<DB: Database> Stage<DB> for MerkleStage {
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let threshold = match self {
MerkleStage::Unwind => {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput {
stage_progress: input.previous_stage_progress(),
done: true,
})
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
#[cfg(test)]
MerkleStage::Both { clean_threshold } => *clean_threshold,
};

let stage_progress = input.stage_progress.unwrap_or_default();
let previous_stage_progress = input.previous_stage_progress();

let from_transition = tx.get_block_transition(stage_progress)?;
let to_transition = tx.get_block_transition(previous_stage_progress)?;
if matches!(self, MerkleStage::Unwind) {
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}

let block_root = tx.get_header_by_num(previous_stage_progress)?.state_root;

Expand Down Expand Up @@ -135,7 +100,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
if matches!(self, MerkleStage::Execution { .. }) {
if matches!(self, MerkleStage::Execution) {
info!(target: "sync::stages::merkle::exec", "Stage is always skipped");
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct SenderRecoveryStage {

impl Default for SenderRecoveryStage {
fn default() -> Self {
Self { batch_size: 250000, commit_threshold: 10000 }
Self { batch_size: 1000, commit_threshold: 5000 }
}
}

Expand Down

0 comments on commit c50f4fd

Please sign in to comment.