Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/db/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<E: EnvironmentKind> Env<E> {
inner: Environment::new()
.set_max_dbs(TABLES.len())
.set_geometry(Geometry {
size: Some(0..0x100000), // TODO: reevaluate
size: Some(0..0x1000000), // TODO: reevaluate
growth_step: Some(0x100000), // TODO: reevaluate
shrink_threshold: None,
page_size: Some(PageSize::Set(default_page_size())),
Expand Down
72 changes: 57 additions & 15 deletions crates/interfaces/src/test_utils/headers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Testing support for headers related interfaces.
use crate::{
consensus::{self, Consensus, Error},
consensus::{self, Consensus},
p2p::headers::{
client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream},
downloader::HeaderDownloader,
Expand All @@ -9,20 +9,28 @@ use crate::{
};
use reth_primitives::{BlockLocked, Header, SealedHeader, H256, H512};
use reth_rpc_types::engine::ForkchoiceState;
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::{broadcast, mpsc, watch};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};

/// A test downloader which just returns the values that have been pushed to it.
#[derive(Debug)]
pub struct TestHeaderDownloader {
result: Result<Vec<SealedHeader>, DownloadError>,
client: Arc<TestHeadersClient>,
consensus: Arc<TestConsensus>,
}

impl TestHeaderDownloader {
/// Instantiates the downloader with the mock responses
pub fn new(result: Result<Vec<SealedHeader>, DownloadError>) -> Self {
Self { result }
pub fn new(client: Arc<TestHeadersClient>, consensus: Arc<TestConsensus>) -> Self {
Self { client, consensus }
}
}

Expand All @@ -36,19 +44,39 @@ impl HeaderDownloader for TestHeaderDownloader {
}

fn consensus(&self) -> &Self::Consensus {
unimplemented!()
&self.consensus
}

fn client(&self) -> &Self::Client {
unimplemented!()
&self.client
}

async fn download(
&self,
_: &SealedHeader,
_: &ForkchoiceState,
) -> Result<Vec<SealedHeader>, DownloadError> {
self.result.clone()
// call consensus stub first. fails if the flag is set
let empty = SealedHeader::default();
self.consensus
.validate_header(&empty, &empty)
.map_err(|error| DownloadError::HeaderValidation { hash: empty.hash(), error })?;

let stream = self.client.stream_headers().await;
let stream = stream.timeout(Duration::from_secs(1));

match Box::pin(stream).try_next().await {
Ok(Some(res)) => {
let mut headers = res.headers.iter().map(|h| h.clone().seal()).collect::<Vec<_>>();
if !headers.is_empty() {
headers.sort_unstable_by_key(|h| h.number);
headers.remove(0); // remove head from response
headers.reverse();
}
Ok(headers)
}
_ => Err(DownloadError::Timeout { request_id: 0 }),
}
}
}

Expand Down Expand Up @@ -93,6 +121,12 @@ impl TestHeadersClient {
pub fn send_header_response(&self, id: u64, headers: Vec<Header>) {
self.res_tx.send((id, headers).into()).expect("failed to send header response");
}

/// Helper for pushing responses to the client
pub async fn send_header_response_delayed(&self, id: u64, headers: Vec<Header>, secs: u64) {
tokio::time::sleep(Duration::from_secs(secs)).await;
self.send_header_response(id, headers);
}
}

#[async_trait::async_trait]
Expand All @@ -106,6 +140,9 @@ impl HeadersClient for TestHeadersClient {
}

async fn stream_headers(&self) -> HeadersStream {
if !self.res_rx.is_empty() {
println!("WARNING: broadcast receiver already contains messages.")
}
Box::pin(BroadcastStream::new(self.res_rx.resubscribe()).filter_map(|e| e.ok()))
}
}
Expand All @@ -116,7 +153,7 @@ pub struct TestConsensus {
/// Watcher over the forkchoice state
channel: (watch::Sender<ForkchoiceState>, watch::Receiver<ForkchoiceState>),
/// Flag whether the header validation should purposefully fail
fail_validation: bool,
fail_validation: AtomicBool,
}

impl Default for TestConsensus {
Expand All @@ -127,7 +164,7 @@ impl Default for TestConsensus {
finalized_block_hash: H256::zero(),
safe_block_hash: H256::zero(),
}),
fail_validation: false,
fail_validation: AtomicBool::new(false),
}
}
}
Expand All @@ -143,9 +180,14 @@ impl TestConsensus {
self.channel.0.send(state).expect("updating fork choice state failed");
}

/// Get the failed validation flag
pub fn fail_validation(&self) -> bool {
self.fail_validation.load(Ordering::SeqCst)
}

/// Update the validation flag
pub fn set_fail_validation(&mut self, val: bool) {
self.fail_validation = val;
pub fn set_fail_validation(&self, val: bool) {
self.fail_validation.store(val, Ordering::SeqCst)
}
}

Expand All @@ -160,15 +202,15 @@ impl Consensus for TestConsensus {
_header: &SealedHeader,
_parent: &SealedHeader,
) -> Result<(), consensus::Error> {
if self.fail_validation {
if self.fail_validation() {
Err(consensus::Error::BaseFeeMissing)
} else {
Ok(())
}
}

fn pre_validate_block(&self, _block: &BlockLocked) -> Result<(), Error> {
if self.fail_validation {
fn pre_validate_block(&self, _block: &BlockLocked) -> Result<(), consensus::Error> {
if self.fail_validation() {
Err(consensus::Error::BaseFeeMissing)
} else {
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/net/headers-downloaders/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ mod tests {

static CONSENSUS: Lazy<Arc<TestConsensus>> = Lazy::new(|| Arc::new(TestConsensus::default()));
static CONSENSUS_FAIL: Lazy<Arc<TestConsensus>> = Lazy::new(|| {
let mut consensus = TestConsensus::default();
let consensus = TestConsensus::default();
consensus.set_fail_validation(true);
Arc::new(consensus)
});
Expand Down
3 changes: 3 additions & 0 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mod pipeline;
mod stage;
mod util;

#[cfg(test)]
mod test_utils;

/// Implementations of stages.
pub mod stages;

Expand Down
Loading