diff --git a/Cargo.lock b/Cargo.lock index 455fb0ddc9d..004ae5b71e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2162,9 +2162,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" +checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" dependencies = [ "futures-core", "futures-sink", @@ -2172,9 +2172,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-executor" @@ -2189,9 +2189,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" +checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" [[package]] name = "futures-locks" @@ -2205,9 +2205,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" dependencies = [ "proc-macro2 1.0.52", "quote 1.0.23", @@ -2216,15 +2216,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" +checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" [[package]] name = "futures-task" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" +checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" [[package]] name = "futures-timer" @@ -2238,9 +2238,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" +checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" dependencies = [ "futures-channel", "futures-core", @@ -4576,6 +4576,19 @@ dependencies = [ "tui", ] +[[package]] +name = "reth-auto-seal-consensus" +version = "0.1.0" +dependencies = [ + "futures-util", + "reth-consensus-common", + "reth-interfaces", + "reth-primitives", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "reth-beacon-consensus" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0481e20bd9e..4b607fd7104 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "bin/reth", + "crates/consensus/auto-seal", "crates/consensus/beacon", "crates/consensus/common", "crates/executor", diff --git a/crates/consensus/auto-seal/Cargo.toml b/crates/consensus/auto-seal/Cargo.toml new file mode 100644 index 00000000000..ad7668c3e8d --- /dev/null +++ b/crates/consensus/auto-seal/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "reth-auto-seal-consensus" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/paradigmxyz/reth" +readme = "README.md" +description = "A consensus impl for local testing purposes" + +[dependencies] +# reth +reth-consensus-common = { path = "../common" } +reth-primitives = { path = "../../primitives" } +reth-interfaces = { path = "../../interfaces" } + +# async +futures-util = "0.3" +tokio = { version = "1", features = ["sync", "time"] } +tokio-stream = { version = "0.1", feautres = ["tokio-util"] } + +# misc +tracing = "0.1" + +[dev-dependencies] +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs new file mode 100644 index 00000000000..07d9e65de90 --- /dev/null +++ b/crates/consensus/auto-seal/src/lib.rs @@ -0,0 +1,17 @@ +#![warn(missing_docs, unreachable_pub, unused_crate_dependencies)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! A [Consensus](reth_interfaces::consensus::Consensus) implementation for local testing purposes +//! that automatically seals blocks. + +use reth_interfaces::consensus::ForkchoiceState; + +mod mode; + +/// A consensus implementation that follows a strategy for announcing blocks via [ForkchoiceState] +#[derive(Debug)] +pub struct AutoSealConsensus {} diff --git a/crates/consensus/auto-seal/src/mode.rs b/crates/consensus/auto-seal/src/mode.rs new file mode 100644 index 00000000000..2b8f497d9cc --- /dev/null +++ b/crates/consensus/auto-seal/src/mode.rs @@ -0,0 +1,139 @@ +//! The mode the consensus is operating in + +use futures_util::{stream::Fuse, SinkExt, StreamExt}; +use reth_primitives::TxHash; +use std::{ + fmt, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{sync::mpsc::Receiver, time::Interval}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tracing::trace; + +/// Mode of operations for the `Miner` +#[derive(Debug)] +pub enum MiningMode { + /// A miner that does nothing + None, + /// A miner that listens for new transactions that are ready. + /// + /// Either one transaction will be mined per block, or any number of transactions will be + /// allowed + Auto(ReadyTransactionMiner), + /// A miner that constructs a new block every `interval` tick + FixedBlockTime(FixedBlockTimeMiner), +} + +// === impl MiningMode === + +impl MiningMode { + pub fn instant(max_transactions: usize, listener: Receiver) -> Self { + MiningMode::Auto(ReadyTransactionMiner { + max_transactions, + has_pending_txs: None, + rx: ReceiverStream::new(listener).fuse(), + }) + } + + pub fn interval(duration: Duration) -> Self { + MiningMode::FixedBlockTime(FixedBlockTimeMiner::new(duration)) + } + + /// polls the [Pool] and returns those transactions that should be put in a block, if any. + pub fn poll( + &mut self, + pool: &Pool, + cx: &mut Context<'_>, + ) -> Poll>> { + match self { + MiningMode::None => Poll::Pending, + MiningMode::Auto(miner) => miner.poll(pool, cx), + MiningMode::FixedBlockTime(miner) => miner.poll(pool, cx), + } + } +} + +/// A miner that's supposed to create a new block every `interval`, mining all transactions that are +/// ready at that time. +/// +/// The default blocktime is set to 6 seconds +#[derive(Debug)] +pub struct FixedBlockTimeMiner { + /// The interval this fixed block time miner operates with + interval: Interval, +} + +// === impl FixedBlockTimeMiner === + +impl FixedBlockTimeMiner { + /// Creates a new instance with an interval of `duration` + pub fn new(duration: Duration) -> Self { + let start = tokio::time::Instant::now() + duration; + Self { interval: tokio::time::interval_at(start, duration) } + } + + fn poll

(&mut self, pool: &P, cx: &mut Context<'_>) -> Poll>> + where + P: 'static, + { + if self.interval.poll_tick(cx).is_ready() { + // drain the pool + return Poll::Ready(pool.ready_transactions().collect()) + } + Poll::Pending + } +} + +impl Default for FixedBlockTimeMiner { + fn default() -> Self { + Self::new(Duration::from_secs(6)) + } +} + +/// A miner that Listens for new ready transactions +pub struct ReadyTransactionMiner { + /// how many transactions to mine per block + max_transactions: usize, + /// stores whether there are pending transactions (if known) + has_pending_txs: Option, + /// Receives hashes of transactions that are ready + rx: Fuse>, +} + +// === impl ReadyTransactionMiner === + +impl ReadyTransactionMiner { + fn poll(&mut self, pool: &Pool, cx: &mut Context<'_>) -> Poll>> { + // drain the notification stream + while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) { + self.has_pending_txs = Some(true); + } + + if self.has_pending_txs == Some(false) { + return Poll::Pending + } + + let transactions = + pool.ready_transactions().take(self.max_transactions).collect::>(); + + // there are pending transactions if we didn't drain the pool + self.has_pending_txs = Some(transactions.len() >= self.max_transactions); + + if transactions.is_empty() { + return Poll::Pending + } + + Poll::Ready(transactions) + } +} + +impl fmt::Debug for ReadyTransactionMiner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadyTransactionMiner") + .field("max_transactions", &self.max_transactions) + .finish_non_exhaustive() + } +}