Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"bin/reth",
"crates/consensus/auto-seal",
"crates/consensus/beacon",
"crates/consensus/common",
"crates/executor",
Expand Down
25 changes: 25 additions & 0 deletions crates/consensus/auto-seal/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "reth-auto-seal-consensus"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/paradigmxyz/reth"
readme = "README.md"
description = "A consensus impl for local testing purposes"

[dependencies]
# reth
reth-consensus-common = { path = "../common" }
reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }

# async
futures-util = "0.3"
tokio = { version = "1", features = ["sync", "time"] }
tokio-stream = { version = "0.1", feautres = ["tokio-util"] }

# misc
tracing = "0.1"

[dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
17 changes: 17 additions & 0 deletions crates/consensus/auto-seal/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#![warn(missing_docs, unreachable_pub, unused_crate_dependencies)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]

//! A [Consensus](reth_interfaces::consensus::Consensus) implementation for local testing purposes
//! that automatically seals blocks.

use reth_interfaces::consensus::ForkchoiceState;

mod mode;

/// A consensus implementation that follows a strategy for announcing blocks via [ForkchoiceState]
#[derive(Debug)]
pub struct AutoSealConsensus {}
139 changes: 139 additions & 0 deletions crates/consensus/auto-seal/src/mode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//! The mode the consensus is operating in

use futures_util::{stream::Fuse, SinkExt, StreamExt};
use reth_primitives::TxHash;
use std::{
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{sync::mpsc::Receiver, time::Interval};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tracing::trace;

/// Mode of operations for the `Miner`
#[derive(Debug)]
pub enum MiningMode {
/// A miner that does nothing
None,
/// A miner that listens for new transactions that are ready.
///
/// Either one transaction will be mined per block, or any number of transactions will be
/// allowed
Auto(ReadyTransactionMiner),
/// A miner that constructs a new block every `interval` tick
FixedBlockTime(FixedBlockTimeMiner),
}

// === impl MiningMode ===

impl MiningMode {
pub fn instant(max_transactions: usize, listener: Receiver<TxHash>) -> Self {
MiningMode::Auto(ReadyTransactionMiner {
max_transactions,
has_pending_txs: None,
rx: ReceiverStream::new(listener).fuse(),
})
}

pub fn interval(duration: Duration) -> Self {
MiningMode::FixedBlockTime(FixedBlockTimeMiner::new(duration))
}

/// polls the [Pool] and returns those transactions that should be put in a block, if any.
pub fn poll<Pool>(
&mut self,
pool: &Pool,
cx: &mut Context<'_>,
) -> Poll<Vec<Arc<PoolTransaction>>> {
match self {
MiningMode::None => Poll::Pending,
MiningMode::Auto(miner) => miner.poll(pool, cx),
MiningMode::FixedBlockTime(miner) => miner.poll(pool, cx),
}
}
}

/// A miner that's supposed to create a new block every `interval`, mining all transactions that are
/// ready at that time.
///
/// The default blocktime is set to 6 seconds
#[derive(Debug)]
pub struct FixedBlockTimeMiner {
/// The interval this fixed block time miner operates with
interval: Interval,
}

// === impl FixedBlockTimeMiner ===

impl FixedBlockTimeMiner {
/// Creates a new instance with an interval of `duration`
pub fn new(duration: Duration) -> Self {
let start = tokio::time::Instant::now() + duration;
Self { interval: tokio::time::interval_at(start, duration) }
}

fn poll<P>(&mut self, pool: &P, cx: &mut Context<'_>) -> Poll<Vec<Arc<PoolTransaction>>>
where
P: 'static,
{
if self.interval.poll_tick(cx).is_ready() {
// drain the pool
return Poll::Ready(pool.ready_transactions().collect())
}
Poll::Pending
}
}

impl Default for FixedBlockTimeMiner {
fn default() -> Self {
Self::new(Duration::from_secs(6))
}
}

/// A miner that Listens for new ready transactions
pub struct ReadyTransactionMiner {
/// how many transactions to mine per block
max_transactions: usize,
/// stores whether there are pending transactions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready
rx: Fuse<ReceiverStream<TxHash>>,
}

// === impl ReadyTransactionMiner ===

impl ReadyTransactionMiner {
fn poll<Pool>(&mut self, pool: &Pool, cx: &mut Context<'_>) -> Poll<Vec<Arc<PoolTransaction>>> {
// drain the notification stream
while let Poll::Ready(Some(_hash)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}

if self.has_pending_txs == Some(false) {
return Poll::Pending
}

let transactions =
pool.ready_transactions().take(self.max_transactions).collect::<Vec<_>>();

// there are pending transactions if we didn't drain the pool
self.has_pending_txs = Some(transactions.len() >= self.max_transactions);

if transactions.is_empty() {
return Poll::Pending
}

Poll::Ready(transactions)
}
}

impl fmt::Debug for ReadyTransactionMiner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadyTransactionMiner")
.field("max_transactions", &self.max_transactions)
.finish_non_exhaustive()
}
}