Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pipeline builder #1017

Merged
merged 12 commits into from
Jan 27, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 31 additions & 41 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_staged_sync::{utils::init::init_genesis, Config};
use reth_stages::{
stages::{
bodies::BodyStage, execution::ExecutionStage, hashing_account::AccountHashingStage,
hashing_storage::StorageHashingStage, headers::HeaderStage, merkle::MerkleStage,
sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage,
},
PipelineEvent, StageId,
sets::{OfflineStages, OnlineStages},
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
Pipeline, PipelineEvent, StageId, StageSet,
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::select;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, warn};

/// Start the node
Expand Down Expand Up @@ -88,7 +84,7 @@ pub struct Command {
impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(&self) -> eyre::Result<()> {
pub async fn execute(self) -> eyre::Result<()> {
// Raise the fd limit of the process.
// Does not do anything on windows.
raise_fd_limit();
Expand Down Expand Up @@ -146,26 +142,20 @@ impl Command {
.start_network()
.await?;

let (sender, receiver) = tokio::sync::mpsc::channel(64);
tokio::spawn(handle_events(stream_select(
network.event_listener().map(Into::into),
ReceiverStream::new(receiver).map(Into::into),
)));

info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");

let fetch_client = Arc::new(network.fetch_client().await?);

// Spawn headers downloader
let headers_downloader = headers::task::TaskDownloader::spawn(
let header_downloader = headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
);

// Spawn bodies downloader
let bodies_downloader = bodies::task::TaskDownloader::spawn(
let body_downloader = bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size)
.with_request_limit(config.stages.bodies.downloader_request_limit)
Expand All @@ -177,32 +167,32 @@ impl Command {
.build(fetch_client.clone(), consensus.clone(), db.clone()),
);

let mut pipeline = reth_stages::Pipeline::default()
let mut pipeline = Pipeline::builder()
.with_sync_state_updater(network.clone())
.with_channel(sender)
.push(HeaderStage {
downloader: headers_downloader,
consensus: consensus.clone(),
sync_status_updates: network.clone(),
})
.push(TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
})
.push(BodyStage { downloader: bodies_downloader, consensus: consensus.clone() })
.push(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.push(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.execution.commit_threshold,
})
// This Merkle stage is used only on unwind
.push(MerkleStage { is_execute: false })
.push(AccountHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 })
.push(StorageHashingStage { clean_threshold: 500_000, commit_threshold: 100_000 })
// This merkle stage is used only for execute
.push(MerkleStage { is_execute: true });
.add_stages(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses the new stage sets: as mentioned, I want to make either the stage sets or the stages optionally ser/de to replace StagesConfig. This would make this a lot easier. Primary hurdles are:

  1. The downloaders, but it is possible to make them serializable too.
  2. Consensus. This should be determined by the chain spec, so should end up being ser/de too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this abstraction.

OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set(
TotalDifficultyStage {
commit_threshold: config.stages.total_difficulty.commit_threshold,
},
),
)
.add_stages(
OfflineStages
.set(SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: config.stages.execution.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain,
commit_threshold: config.stages.execution.commit_threshold,
}),
)
.build();

tokio::spawn(handle_events(stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
)));

// Run pipeline
info!(target: "reth::cli", "Starting sync pipeline");
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
use reth_staged_sync::Config;
use reth_stages::{
stages::{bodies::BodyStage, execution::ExecutionStage, sender_recovery::SenderRecoveryStage},
stages::{BodyStage, ExecutionStage, SenderRecoveryStage},
ExecInput, Stage, StageId, Transaction, UnwindInput,
};

Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/test_eth_chain/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_primitives::{
StorageEntry, H256, U256,
};
use reth_rlp::Decodable;
use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, StageId, Transaction};
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId, Transaction};
use std::{
collections::HashMap,
ffi::OsStr,
Expand Down
4 changes: 2 additions & 2 deletions crates/net/downloaders/src/headers/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ struct HeadersResponseError {
}
gakonst marked this conversation as resolved.
Show resolved Hide resolved

/// The block to which we want to close the gap: (local head...sync target]
#[derive(Debug)]
#[derive(Debug, Default)]
struct SyncTargetBlock {
/// Block hash of the targeted block
hash: H256,
Expand Down Expand Up @@ -943,7 +943,6 @@ mod tests {

let mut downloader = LinearDownloadBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));

downloader.update_local_head(genesis);
downloader.update_sync_target(SyncTarget::Tip(H256::random()));

Expand Down Expand Up @@ -1015,6 +1014,7 @@ mod tests {
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(genesis);
downloader.update_sync_target(SyncTarget::Tip(H256::random()));

downloader.next_request_block_number = start;

let mut total = 0;
Expand Down
7 changes: 4 additions & 3 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use reth_primitives::{ChainSpec, ForkFilter, NodeRecord, PeerId, MAINNET};
use reth_provider::{BlockProvider, HeaderProvider};
use reth_tasks::TaskExecutor;
use secp256k1::{SecretKey, SECP256K1};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
Expand Down Expand Up @@ -120,7 +119,8 @@ where
}

/// Builder for [`NetworkConfig`](struct.NetworkConfig.html).
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[allow(missing_docs)]
pub struct NetworkConfigBuilder {
/// The node's secret key, from which the node's identity is derived.
Expand Down Expand Up @@ -338,7 +338,8 @@ impl NetworkConfigBuilder {
/// This affects block propagation in the `eth` sub-protocol [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
///
/// In POS `NewBlockHashes` and `NewBlock` messages become invalid.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum NetworkMode {
/// Network is in proof-of-work mode.
Work,
Expand Down
19 changes: 13 additions & 6 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,32 @@ reth-provider = { path = "../storage/provider" }

# async
tokio = { version = "1.21.2", features = ["sync"] }

tokio-stream = "0.1.10"
async-trait = "0.1.57"
thiserror = "1.0.37"
futures-util = "0.3.25"

# observability
tracing = "0.1.36"
aquamarine = "0.1.12"
metrics = "0.20.1"
futures-util = "0.3.25"

# misc
serde = { version = "1.0", optional = true }
thiserror = "1.0.37"
aquamarine = "0.1.12"
itertools = "0.10.5"
rayon = "1.6.0"

[dev-dependencies]
# reth
reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-downloaders = { path = "../net/downloaders" }
reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody]
tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10"
tempfile = "3.3.0"
assert_matches = "1.5.0"
rand = "0.8.5"
paste = "1.0"

[features]
default = ["serde"]
serde = ["dep:serde"]
4 changes: 2 additions & 2 deletions crates/stages/src/id.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stages::{bodies::BODIES, headers::HEADERS};
use crate::stages::{BODIES, HEADERS};
use metrics::absolute_counter;
use reth_db::{
tables::SyncStage,
Expand All @@ -11,7 +11,7 @@ use std::fmt::Display;
/// The ID of a stage.
///
/// Each stage ID must be unique.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct StageId(pub &'static str);

impl Display for StageId {
Expand Down
46 changes: 41 additions & 5 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,48 @@
))]
//! Staged syncing primitives for reth.
//!
//! See [Stage] and [Pipeline].
//! This crate contains the syncing primitives [`Pipeline`] and [`Stage`], as well as all stages
//! that reth uses to sync.
//!
//! # Metrics
//! A pipeline can be configured using [`Pipeline::builder()`].
onbjerg marked this conversation as resolved.
Show resolved Hide resolved
//!
//! This library exposes metrics via the [`metrics`][metrics_core] crate:
//! For ease of use, this crate also exposes a set of [`StageSet`]s, which are collections of stages
//! that perform specific functions during sync. Stage sets can be customized; it is possible to
//! add, disable and replace stages in the set.
//!
//! - `stage_progress{stage}`: The block number each stage has currently reached.

//! # Examples
//!
//! ```
//! # use std::sync::Arc;
//! # use reth_db::mdbx::test_utils::create_test_rw_db;
//! # use reth_db::mdbx::{Env, WriteMap};
//! # use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
//! # use reth_downloaders::headers::linear::LinearDownloadBuilder;
//! # use reth_interfaces::consensus::Consensus;
//! # use reth_interfaces::sync::NoopSyncStateUpdate;
//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient};
//! # use reth_primitives::PeerId;
//! # use reth_stages::Pipeline;
//! # use reth_stages::sets::DefaultStages;
//! # let consensus: Arc<dyn Consensus> = Arc::new(TestConsensus::default());
//! # let headers_downloader = LinearDownloadBuilder::default().build(
//! # consensus.clone(),
//! # Arc::new(TestHeadersClient::default())
//! # );
//! # let bodies_downloader = ConcurrentDownloaderBuilder::default().build(
//! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }),
//! # consensus.clone(),
//! # create_test_rw_db()
//! # );
//! // Create a pipeline that can fully sync
//! # let pipeline: Pipeline<Env<WriteMap>, NoopSyncStateUpdate> =
//! Pipeline::builder()
//! .add_stages(
//! DefaultStages::new(consensus, headers_downloader, bodies_downloader)
//! )
//! .build();
//! #
//! ```
mod db;
mod error;
mod id;
Expand All @@ -27,6 +61,8 @@ mod test_utils;
/// Implementations of stages.
pub mod stages;

pub mod sets;

pub use db::Transaction;
pub use error::*;
pub use id::*;
Expand Down
69 changes: 69 additions & 0 deletions crates/stages/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::{pipeline::QueuedStage, Pipeline, Stage, StageSet};
use reth_db::database::Database;
use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater};
use reth_primitives::BlockNumber;

/// Builds a [`Pipeline`].
#[derive(Debug)]
#[must_use = "call `build` to construct the pipeline"]
pub struct PipelineBuilder<DB, U = NoopSyncStateUpdate>
where
DB: Database,
U: SyncStateUpdater,
{
pipeline: Pipeline<DB, U>,
}

impl<DB: Database, U: SyncStateUpdater> Default for PipelineBuilder<DB, U> {
fn default() -> Self {
Self { pipeline: Pipeline::default() }
}
}

impl<DB, U> PipelineBuilder<DB, U>
where
DB: Database,
U: SyncStateUpdater,
{
/// Add a stage to the pipeline.
pub fn add_stage<S>(mut self, stage: S) -> Self
where
S: Stage<DB> + 'static,
{
self.pipeline.stages.push(QueuedStage { stage: Box::new(stage) });
self
}

/// Add a set of stages to the pipeline.
///
/// Stages can be grouped into a set by using a [`StageSet`].
///
/// To customize the stages in the set (reorder, disable, insert a stage) call
/// [`build`][StageSet::build] on the set which will convert it to a
/// [`StageSetBuilder`][crate::StageSetBuilder].
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.build().finish() {
self.pipeline.stages.push(QueuedStage { stage });
}
self
}

/// Set the target block.
///
/// Once this block is reached, the pipeline will stop.
pub fn with_max_block(mut self, block: BlockNumber) -> Self {
self.pipeline.max_block = Some(block);
self
}

/// Set a [SyncStateUpdater].
pub fn with_sync_state_updater(mut self, updater: U) -> Self {
self.pipeline.sync_state_updater = Some(updater);
self
}

/// Builds the final [`Pipeline`].
pub fn build(self) -> Pipeline<DB, U> {
self.pipeline
}
}
Loading