From 86cc6d331affc563d1465a3808474fc97ef9d1c6 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 17 Mar 2023 13:50:48 +0100 Subject: [PATCH 1/4] feat: add auto-seal consensus impl --- Cargo.lock | 41 +++++--- Cargo.toml | 1 + crates/consensus/auto-seal/Cargo.toml | 25 +++++ crates/consensus/auto-seal/src/lib.rs | 17 +++ crates/consensus/auto-seal/src/mode.rs | 139 +++++++++++++++++++++++++ 5 files changed, 209 insertions(+), 14 deletions(-) create mode 100644 crates/consensus/auto-seal/Cargo.toml create mode 100644 crates/consensus/auto-seal/src/lib.rs create mode 100644 crates/consensus/auto-seal/src/mode.rs diff --git a/Cargo.lock b/Cargo.lock index 455fb0ddc9d..004ae5b71e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2162,9 +2162,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" +checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" dependencies = [ "futures-core", "futures-sink", @@ -2172,9 +2172,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-executor" @@ -2189,9 +2189,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" +checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" [[package]] name = "futures-locks" @@ -2205,9 +2205,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" dependencies = [ "proc-macro2 1.0.52", "quote 1.0.23", @@ -2216,15 +2216,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" +checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" [[package]] name = "futures-task" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" +checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" [[package]] name = "futures-timer" @@ -2238,9 +2238,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" +checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" dependencies = [ "futures-channel", "futures-core", @@ -4576,6 +4576,19 @@ dependencies = [ "tui", ] +[[package]] +name = "reth-auto-seal-consensus" +version = "0.1.0" +dependencies = [ + "futures-util", + "reth-consensus-common", + "reth-interfaces", + "reth-primitives", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "reth-beacon-consensus" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0481e20bd9e..4b607fd7104 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "bin/reth", + "crates/consensus/auto-seal", "crates/consensus/beacon", "crates/consensus/common", "crates/executor", diff --git a/crates/consensus/auto-seal/Cargo.toml b/crates/consensus/auto-seal/Cargo.toml new file mode 100644 index 00000000000..ad7668c3e8d --- /dev/null +++ b/crates/consensus/auto-seal/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "reth-auto-seal-consensus" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/paradigmxyz/reth" +readme = "README.md" +description = "A consensus impl for local testing purposes" + +[dependencies] +# reth +reth-consensus-common = { path = "../common" } +reth-primitives = { path = "../../primitives" } +reth-interfaces = { path = "../../interfaces" } + +# async +futures-util = "0.3" +tokio = { version = "1", features = ["sync", "time"] } +tokio-stream = { version = "0.1", feautres = ["tokio-util"] } + +# misc +tracing = "0.1" + +[dev-dependencies] +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs new file mode 100644 index 00000000000..07d9e65de90 --- /dev/null +++ b/crates/consensus/auto-seal/src/lib.rs @@ -0,0 +1,17 @@ +#![warn(missing_docs, unreachable_pub, unused_crate_dependencies)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! A [Consensus](reth_interfaces::consensus::Consensus) implementation for local testing purposes +//! that automatically seals blocks. + +use reth_interfaces::consensus::ForkchoiceState; + +mod mode; + +/// A consensus implementation that follows a strategy for announcing blocks via [ForkchoiceState] +#[derive(Debug)] +pub struct AutoSealConsensus {} diff --git a/crates/consensus/auto-seal/src/mode.rs b/crates/consensus/auto-seal/src/mode.rs new file mode 100644 index 00000000000..2b8f497d9cc --- /dev/null +++ b/crates/consensus/auto-seal/src/mode.rs @@ -0,0 +1,139 @@ +//! The mode the consensus is operating in + +use futures_util::{stream::Fuse, SinkExt, StreamExt}; +use reth_primitives::TxHash; +use std::{ + fmt, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{sync::mpsc::Receiver, time::Interval}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tracing::trace; + +/// Mode of operations for the `Miner` +#[derive(Debug)] +pub enum MiningMode { + /// A miner that does nothing + None, + /// A miner that listens for new transactions that are ready. + /// + /// Either one transaction will be mined per block, or any number of transactions will be + /// allowed + Auto(ReadyTransactionMiner), + /// A miner that constructs a new block every `interval` tick + FixedBlockTime(FixedBlockTimeMiner), +} + +// === impl MiningMode === + +impl MiningMode { + pub fn instant(max_transactions: usize, listener: Receiver) -> Self { + MiningMode::Auto(ReadyTransactionMiner { + max_transactions, + has_pending_txs: None, + rx: ReceiverStream::new(listener).fuse(), + }) + } + + pub fn interval(duration: Duration) -> Self { + MiningMode::FixedBlockTime(FixedBlockTimeMiner::new(duration)) + } + + /// polls the [Pool] and returns those transactions that should be put in a block, if any. + pub fn poll( + &mut self, + pool: &Pool, + cx: &mut Context<'_>, + ) -> Poll>> { + match self { + MiningMode::None => Poll::Pending, + MiningMode::Auto(miner) => miner.poll(pool, cx), + MiningMode::FixedBlockTime(miner) => miner.poll(pool, cx), + } + } +} + +/// A miner that's supposed to create a new block every `interval`, mining all transactions that are +/// ready at that time. +/// +/// The default blocktime is set to 6 seconds +#[derive(Debug)] +pub struct FixedBlockTimeMiner { + /// The interval this fixed block time miner operates with + interval: Interval, +} + +// === impl FixedBlockTimeMiner === + +impl FixedBlockTimeMiner { + /// Creates a new instance with an interval of `duration` + pub fn new(duration: Duration) -> Self { + let start = tokio::time::Instant::now() + duration; + Self { interval: tokio::time::interval_at(start, duration) } + } + + fn poll

(&mut self, pool: &P, cx: &mut Context<'_>) -> Poll>> + where + P: 'static, + { + if self.interval.poll_tick(cx).is_ready() { + // drain the pool + return Poll::Ready(pool.ready_transactions().collect()) + } + Poll::Pending + } +} + +impl Default for FixedBlockTimeMiner { + fn default() -> Self { + Self::new(Duration::from_secs(6)) + } +} + +/// A miner that Listens for new ready transactions +pub struct ReadyTransactionMiner { + /// how many transactions to mine per block + max_transactions: usize, + /// stores whether there are pending transactions (if known) + has_pending_txs: Option, + /// Receives hashes of transactions that are ready + rx: Fuse>, +} + +// === impl ReadyTransactionMiner === + +impl ReadyTransactionMiner { + fn poll(&mut self, pool: &Pool, cx: &mut Context<'_>) -> Poll>> { + // drain the notification stream + while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) { + self.has_pending_txs = Some(true); + } + + if self.has_pending_txs == Some(false) { + return Poll::Pending + } + + let transactions = + pool.ready_transactions().take(self.max_transactions).collect::>(); + + // there are pending transactions if we didn't drain the pool + self.has_pending_txs = Some(transactions.len() >= self.max_transactions); + + if transactions.is_empty() { + return Poll::Pending + } + + Poll::Ready(transactions) + } +} + +impl fmt::Debug for ReadyTransactionMiner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadyTransactionMiner") + .field("max_transactions", &self.max_transactions) + .finish_non_exhaustive() + } +} From c186ee102514b48e99586233cc03c33c5a58da98 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 20 Mar 2023 20:32:03 -0400 Subject: [PATCH 2/4] stub headers and bodies clients --- crates/consensus/auto-seal/src/mode.rs | 37 ++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/consensus/auto-seal/src/mode.rs b/crates/consensus/auto-seal/src/mode.rs index 2b8f497d9cc..fec29b8748a 100644 --- a/crates/consensus/auto-seal/src/mode.rs +++ b/crates/consensus/auto-seal/src/mode.rs @@ -1,7 +1,8 @@ //! The mode the consensus is operating in use futures_util::{stream::Fuse, SinkExt, StreamExt}; -use reth_primitives::TxHash; +use reth_interfaces::p2p::{headers::client::{HeadersClient, HeadersFut, HeadersRequest}, priority::Priority, download::DownloadClient, bodies::client::{BodiesClient, BodiesFut}}; +use reth_primitives::{TxHash, PeerId, H256}; use std::{ fmt, pin::Pin, @@ -11,7 +12,7 @@ use std::{ }; use tokio::{sync::mpsc::Receiver, time::Interval}; use tokio_stream::{wrappers::ReceiverStream, Stream}; -use tracing::trace; +use tracing::{trace, warn}; /// Mode of operations for the `Miner` #[derive(Debug)] @@ -137,3 +138,35 @@ impl fmt::Debug for ReadyTransactionMiner { .finish_non_exhaustive() } } + +impl HeadersClient for ReadyTransactionMiner { + type Output = HeadersFut; + + fn get_headers_with_priority( + &self, + request: HeadersRequest, + _priority: Priority, + ) -> Self::Output { + todo!() + } +} + +impl BodiesClient for ReadyTransactionMiner { + type Output = BodiesFut; + + fn get_block_bodies_with_priority(&self, hashes: Vec, priority: Priority) -> Self::Output { + todo!() + } +} + +impl DownloadClient for ReadyTransactionMiner { + fn report_bad_message(&self, _peer_id: PeerId) { + warn!("Reported a bad message on a miner, we should never produce bad blocks"); + // noop + } + + fn num_connected_peers(&self) -> usize { + // no such thing as connected peers when we are mining ourselves + 1 + } +} From 7d20f6defbb002d88273f3f57292a45323efcad7 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 20 Mar 2023 21:27:52 -0400 Subject: [PATCH 3/4] impl more of autosealclient --- Cargo.lock | 1 + crates/consensus/auto-seal/Cargo.toml | 3 +- crates/consensus/auto-seal/src/lib.rs | 1 + .../consensus/auto-seal/src/miner_client.rs | 108 ++++++++++++++++++ crates/consensus/auto-seal/src/mode.rs | 37 +----- 5 files changed, 114 insertions(+), 36 deletions(-) create mode 100644 crates/consensus/auto-seal/src/miner_client.rs diff --git a/Cargo.lock b/Cargo.lock index 004ae5b71e2..d2a3f714c8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4584,6 +4584,7 @@ dependencies = [ "reth-consensus-common", "reth-interfaces", "reth-primitives", + "reth-transaction-pool", "tokio", "tokio-stream", "tracing", diff --git a/crates/consensus/auto-seal/Cargo.toml b/crates/consensus/auto-seal/Cargo.toml index ad7668c3e8d..96ad870b8ab 100644 --- a/crates/consensus/auto-seal/Cargo.toml +++ b/crates/consensus/auto-seal/Cargo.toml @@ -8,10 +8,11 @@ readme = "README.md" description = "A consensus impl for local testing purposes" [dependencies] -# reth +# reth reth-consensus-common = { path = "../common" } reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } +reth-transaction-pool = { path = "../../transaction-pool" } # async futures-util = "0.3" diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs index 07d9e65de90..7a1ac9fab86 100644 --- a/crates/consensus/auto-seal/src/lib.rs +++ b/crates/consensus/auto-seal/src/lib.rs @@ -10,6 +10,7 @@ use reth_interfaces::consensus::ForkchoiceState; +mod miner_client; mod mode; /// A consensus implementation that follows a strategy for announcing blocks via [ForkchoiceState] diff --git a/crates/consensus/auto-seal/src/miner_client.rs b/crates/consensus/auto-seal/src/miner_client.rs new file mode 100644 index 00000000000..6da9f487bf1 --- /dev/null +++ b/crates/consensus/auto-seal/src/miner_client.rs @@ -0,0 +1,108 @@ +//! This includes download client implementations for auto sealing miners. +//! +//! Auto sealing miners can be polled, and will return a list of transactions that are ready to be +//! mined. +//! +//! TODO: this polls the miners, so maybe the consensus should also be implemented here? +//! +//! These downloaders poll the miner, assemble the block, and return transactions that are ready to +//! be mined. +use std::{ + collections::HashMap, + fmt::Debug, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::mode::MiningMode; +use futures_util::Future; +use reth_interfaces::p2p::{ + bodies::client::{BodiesClient, BodiesFut}, + download::DownloadClient, + headers::client::{HeadersClient, HeadersFut, HeadersRequest}, + priority::Priority, +}; +// need to move this to primitives!!! +use reth_eth_wire::BlockBody; +use reth_primitives::{BlockHash, BlockNumber, Header, PeerId, H256}; +use reth_transaction_pool::TransactionPool; +use tracing::warn; + +/// A download client that polls the miner for transactions and assembles blocks to be returned in +/// the download process. +/// +/// When polled, the miner will assemble blocks when miners produce ready transactions and store the +/// blocks in memory. +#[derive(Debug)] +pub struct AutoSealClient { + /// The active miner + miner: MiningMode, + + /// Headers buffered for download. + headers: HashMap, + + /// A mapping between block hash and number. + hash_to_number: HashMap, + + /// Bodies buffered for download. + bodies: HashMap, + + /// The mempool to poll for new ready transactions. + pool: Pool, +} + +impl Future for AutoSealClient { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + while let Poll::Ready(txs) = Pin::new(&mut self.miner).poll(&self.pool, cx) { + todo!("assemble block and put the header / body in maps") + } + Poll::Pending + } +} + +impl HeadersClient for AutoSealClient +where + Pool: TransactionPool + Clone + 'static + Debug, +{ + type Output = HeadersFut; + + fn get_headers_with_priority( + &self, + request: HeadersRequest, + _priority: Priority, + ) -> Self::Output { + todo!("return from map") + } +} + +impl BodiesClient for AutoSealClient +where + Pool: TransactionPool + Clone + 'static + Debug, +{ + type Output = BodiesFut; + + fn get_block_bodies_with_priority( + &self, + hashes: Vec, + priority: Priority, + ) -> Self::Output { + todo!("return from map") + } +} + +impl DownloadClient for AutoSealClient +where + Pool: TransactionPool + Clone + 'static + Debug, +{ + fn report_bad_message(&self, _peer_id: PeerId) { + warn!("Reported a bad message on a miner, we should never produce bad blocks"); + // noop + } + + fn num_connected_peers(&self) -> usize { + // no such thing as connected peers when we are mining ourselves + 1 + } +} diff --git a/crates/consensus/auto-seal/src/mode.rs b/crates/consensus/auto-seal/src/mode.rs index fec29b8748a..2b8f497d9cc 100644 --- a/crates/consensus/auto-seal/src/mode.rs +++ b/crates/consensus/auto-seal/src/mode.rs @@ -1,8 +1,7 @@ //! The mode the consensus is operating in use futures_util::{stream::Fuse, SinkExt, StreamExt}; -use reth_interfaces::p2p::{headers::client::{HeadersClient, HeadersFut, HeadersRequest}, priority::Priority, download::DownloadClient, bodies::client::{BodiesClient, BodiesFut}}; -use reth_primitives::{TxHash, PeerId, H256}; +use reth_primitives::TxHash; use std::{ fmt, pin::Pin, @@ -12,7 +11,7 @@ use std::{ }; use tokio::{sync::mpsc::Receiver, time::Interval}; use tokio_stream::{wrappers::ReceiverStream, Stream}; -use tracing::{trace, warn}; +use tracing::trace; /// Mode of operations for the `Miner` #[derive(Debug)] @@ -138,35 +137,3 @@ impl fmt::Debug for ReadyTransactionMiner { .finish_non_exhaustive() } } - -impl HeadersClient for ReadyTransactionMiner { - type Output = HeadersFut; - - fn get_headers_with_priority( - &self, - request: HeadersRequest, - _priority: Priority, - ) -> Self::Output { - todo!() - } -} - -impl BodiesClient for ReadyTransactionMiner { - type Output = BodiesFut; - - fn get_block_bodies_with_priority(&self, hashes: Vec, priority: Priority) -> Self::Output { - todo!() - } -} - -impl DownloadClient for ReadyTransactionMiner { - fn report_bad_message(&self, _peer_id: PeerId) { - warn!("Reported a bad message on a miner, we should never produce bad blocks"); - // noop - } - - fn num_connected_peers(&self) -> usize { - // no such thing as connected peers when we are mining ourselves - 1 - } -} From 283b56f46352564a1d671ea41cad442c0c9f9b21 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 20 Mar 2023 22:47:57 -0400 Subject: [PATCH 4/4] change BlockBody import --- crates/consensus/auto-seal/src/miner_client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/consensus/auto-seal/src/miner_client.rs b/crates/consensus/auto-seal/src/miner_client.rs index 6da9f487bf1..5ab4016b707 100644 --- a/crates/consensus/auto-seal/src/miner_client.rs +++ b/crates/consensus/auto-seal/src/miner_client.rs @@ -22,9 +22,7 @@ use reth_interfaces::p2p::{ headers::client::{HeadersClient, HeadersFut, HeadersRequest}, priority::Priority, }; -// need to move this to primitives!!! -use reth_eth_wire::BlockBody; -use reth_primitives::{BlockHash, BlockNumber, Header, PeerId, H256}; +use reth_primitives::{BlockHash, BlockNumber, Header, PeerId, H256, BlockBody}; use reth_transaction_pool::TransactionPool; use tracing::warn;