Skip to content

Commit

Permalink
feat: pipeline builder (paradigmxyz#1017)
Browse files Browse the repository at this point in the history
Co-authored-by: Georgios Konstantopoulos <[email protected]>
  • Loading branch information
2 people authored and literallymarvellous committed Feb 2, 2023
1 parent 5602df3 commit d5aac1a
Show file tree
Hide file tree
Showing 27 changed files with 844 additions and 308 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 30 additions & 41 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_staged_sync::{utils::init::init_genesis, Config};
use reth_stages::{
stages::{
bodies::BodyStage, execution::ExecutionStage, hashing_account::AccountHashingStage,
hashing_storage::StorageHashingStage, headers::HeaderStage, merkle::MerkleStage,
sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage,
},
PipelineEvent, StageId,
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::select;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, warn};

/// Start the node
Expand Down Expand Up @@ -88,7 +83,7 @@ pub struct Command {
impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(&self) -> eyre::Result<()> {
pub async fn execute(self) -> eyre::Result<()> {
// Raise the fd limit of the process.
// Does not do anything on windows.
raise_fd_limit();
Expand Down Expand Up @@ -146,26 +141,20 @@ impl Command {
.start_network()
.await?;

let (sender, receiver) = tokio::sync::mpsc::channel(64);
tokio::spawn(handle_events(stream_select(
network.event_listener().map(Into::into),
ReceiverStream::new(receiver).map(Into::into),
)));

info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");

let fetch_client = Arc::new(network.fetch_client().await?);

// Spawn headers downloader
let headers_downloader = headers::task::TaskDownloader::spawn(
let header_downloader = headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
);

// Spawn bodies downloader
let bodies_downloader = bodies::task::TaskDownloader::spawn(
let body_downloader = bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
Expand All @@ -177,32 +166,32 @@ impl Command {
.build(fetch_client.clone(), consensus.clone(), db.clone()),
);

let mut pipeline = reth_stages::Pipeline::default()
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(network.clone())
.with_channel(sender)
.push(HeaderStage {
downloader: headers_downloader,
consensus: consensus.clone(),
sync_status_updates: network.clone(),
})
.push(TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
})
.push(BodyStage { downloader: bodies_downloader, consensus: consensus.clone() })
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.push(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.execution.commit_threshold,
})
// This Merkle stage is used only on unwind
.push(MerkleStage { is_execute: false })
.push(AccountHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 })
.push(StorageHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 })
// This merkle stage is used only for execute
.push(MerkleStage { is_execute: true });
.add_stages(
OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
},
),
)
.add_stages(
OfflineStages::default()
.set(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.execution.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain,
commit_threshold: config.stages.execution.commit_threshold,
}),
)
.build();

tokio::spawn(handle_events(stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
)));

// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
use reth_staged_sync::Config;
use reth_stages::{
stages::{bodies::BodyStage, execution::ExecutionStage, sender_recovery::SenderRecoveryStage},
stages::{BodyStage, ExecutionStage, SenderRecoveryStage},
ExecInput, Stage, StageId, Transaction, UnwindInput,
};

Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/test_eth_chain/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_primitives::{
SealedHeader, StorageEntry, H256, U256,
};
use reth_rlp::Decodable;
use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, StageId, Transaction};
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId, Transaction};
use std::{
collections::HashMap,
ffi::OsStr,
Expand Down
4 changes: 2 additions & 2 deletions crates/net/downloaders/src/headers/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ struct HeadersResponseError {
}

/// The block to which we want to close the gap: (local head...sync target]
#[derive(Debug)]
#[derive(Debug, Default)]
struct SyncTargetBlock {
/// Block hash of the targeted block
hash: H256,
Expand Down Expand Up @@ -943,7 +943,6 @@ mod tests {

let mut downloader = LinearDownloadBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));

downloader.update_local_head(genesis);
downloader.update_sync_target(SyncTarget::Tip(H256::random()));

Expand Down Expand Up @@ -1015,6 +1014,7 @@ mod tests {
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(genesis);
downloader.update_sync_target(SyncTarget::Tip(H256::random()));

downloader.next_request_block_number = start;

let mut total = 0;
Expand Down
7 changes: 4 additions & 3 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use reth_primitives::{ChainSpec, ForkFilter, NodeRecord, PeerId, MAINNET};
use reth_provider::{BlockProvider, HeaderProvider};
use reth_tasks::TaskExecutor;
use secp256k1::{SecretKey, SECP256K1};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
Expand Down Expand Up @@ -120,7 +119,8 @@ where
}

/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[allow(missing_docs)]
pub struct NetworkConfigBuilder {
/// The node's secret key, from which the node's identity is derived.
Expand Down Expand Up @@ -338,7 +338,8 @@ impl NetworkConfigBuilder {
/// This affects block propagation in the `eth` sub-protocol [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
///
/// In POS `NewBlockHashes` and `NewBlock` messages become invalid.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum NetworkMode {
/// Network is in proof-of-work mode.
Work,
Expand Down
19 changes: 13 additions & 6 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,32 @@ reth-provider = { path = "../storage/provider" }

# async
tokio = { version = "1.21.2", features = ["sync"] }

tokio-stream = "0.1.10"
async-trait = "0.1.57"
thiserror = "1.0.37"
futures-util = "0.3.25"

# observability
tracing = "0.1.36"
aquamarine = "0.1.12"
metrics = "0.20.1"
futures-util = "0.3.25"

# misc
serde = { version = "1.0", optional = true }
thiserror = "1.0.37"
aquamarine = "0.1.12"
itertools = "0.10.5"
rayon = "1.6.0"

[dev-dependencies]
# reth
reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-downloaders = { path = "../net/downloaders" }
reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody]
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"
tempfile = "3.3.0"
assert_matches = "1.5.0"
rand = "0.8.5"
paste = "1.0"

[features]
default = ["serde"]
serde = ["dep:serde"]
4 changes: 2 additions & 2 deletions crates/stages/src/id.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stages::{bodies::BODIES, headers::HEADERS};
use crate::stages::{BODIES, HEADERS};
use metrics::absolute_counter;
use reth_db::{
tables::SyncStage,
Expand All @@ -11,7 +11,7 @@ use std::fmt::Display;
/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct StageId(pub &'static str);

impl Display for StageId {
Expand Down
49 changes: 44 additions & 5 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,48 @@
))]
//! Staged syncing primitives for reth.
//!
//! See [Stage] and [Pipeline].
//! This crate contains the syncing primitives [`Pipeline`] and [`Stage`], as well as all stages
//! that reth uses to sync.
//!
//! # Metrics
//! A pipeline can be configured using [`Pipeline::builder()`].
//!
//! This library exposes metrics via the [`metrics`][metrics_core] crate:
//! For ease of use, this crate also exposes a set of [`StageSet`]s, which are collections of stages
//! that perform specific functions during sync. Stage sets can be customized; it is possible to
//! add, disable and replace stages in the set.
//!
//! - `stage_progress{stage}`: The block number each stage has currently reached.
//! # Examples
//!
//! ```
//! # use std::sync::Arc;
//! # use reth_db::mdbx::test_utils::create_test_rw_db;
//! # use reth_db::mdbx::{Env, WriteMap};
//! # use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
//! # use reth_downloaders::headers::linear::LinearDownloadBuilder;
//! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient};
//! # use reth_primitives::PeerId;
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::DefaultStages;
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
//! # let headers_downloader = LinearDownloadBuilder::default().build(
//! # consensus.clone(),
//! # Arc::new(TestHeadersClient::default())
//! # );
//! # let bodies_downloader = ConcurrentDownloaderBuilder::default().build(
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }),
//! # consensus.clone(),
//! # create_test_rw_db()
//! # );
//! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! Pipeline::builder()
//! .add_stages(
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader)
//! )
//! .build();
//! #
//! ```
mod db;
mod error;
mod id;
Expand All @@ -24,9 +58,14 @@ mod util;
#[cfg(test)]
mod test_utils;

/// A re-export of common structs and traits.
pub mod prelude;

/// Implementations of stages.
pub mod stages;

pub mod sets;

pub use db::Transaction;
pub use error::*;
pub use id::*;
Expand Down
69 changes: 69 additions & 0 deletions crates/stages/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::{pipeline::QueuedStage, Pipeline, Stage, StageSet};
use reth_db::database::Database;
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
use reth_primitives::BlockNumber;

/// Builds a [`Pipeline`].
#[derive(Debug)]
#[must_use = "call `build` to construct the pipeline"]
pub struct PipelineBuilder<DB, U = NoopSyncStateUpdate>
where
DB: Database,
U: SyncStateUpdater,
{
pipeline: Pipeline<DB, U>,
}

impl<DB: Database, U: SyncStateUpdater> Default for PipelineBuilder<DB, U> {
fn default() -> Self {
Self { pipeline: Pipeline::default() }
}
}

impl<DB, U> PipelineBuilder<DB, U>
where
DB: Database,
U: SyncStateUpdater,
{
/// Add a stage to the pipeline.
pub fn add_stage<S>(mut self, stage: S) -> Self
where
S: Stage<DB> + 'static,
{
self.pipeline.stages.push(QueuedStage { stage: Box::new(stage) });
self
}

/// Add a set of stages to the pipeline.
///
/// Stages can be grouped into a set by using a [`StageSet`].
///
/// To customize the stages in the set (reorder, disable, insert a stage) call
/// [`build`][StageSet::build] on the set which will convert it to a
/// [`StageSetBuilder`][crate::StageSetBuilder].
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
self.pipeline.stages.push(QueuedStage { stage });
}
self
}

/// Set the target block.
///
/// Once this block is reached, the pipeline will stop.
pub fn with_max_block(mut self, block: BlockNumber) -> Self {
self.pipeline.max_block = Some(block);
self
}

/// Set a [SyncStateUpdater].
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
self.pipeline.sync_state_updater = Some(updater);
self
}

/// Builds the final [`Pipeline`].
pub fn build(self) -> Pipeline<DB, U> {
self.pipeline
}
}
Loading

0 comments on commit d5aac1a

Please sign in to comment.