Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 1 addition & 57 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,11 @@ pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use crate::{test_utils::TestStage, StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, EnvKind};
use reth_interfaces::{consensus, provider::ProviderError, sync::NoopSyncStateUpdate};
use tokio_stream::StreamExt;
use utils::TestStage;

#[test]
fn record_progress_calculates_outliers() {
Expand Down Expand Up @@ -700,59 +699,4 @@ mod tests {
})))
);
}

mod utils {
use super::*;
use async_trait::async_trait;
use std::collections::VecDeque;

pub(crate) struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}

impl TestStage {
pub(crate) fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
}

pub(crate) fn add_exec(mut self, output: Result<ExecOutput, StageError>) -> Self {
self.exec_outputs.push_back(output);
self
}

pub(crate) fn add_unwind(mut self, output: Result<UnwindOutput, StageError>) -> Self {
self.unwind_outputs.push_back(output);
self
}
}

#[async_trait]
impl<DB: Database> Stage<DB> for TestStage {
fn id(&self) -> StageId {
self.id
}

async fn execute(
&mut self,
_: &mut Transaction<'_, DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}

async fn unwind(
&mut self,
_: &mut Transaction<'_, DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))
}
}
}
}
16 changes: 13 additions & 3 deletions crates/stages/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@
use crate::StageId;

mod macros;
mod runner;
mod test_db;

pub(crate) use macros::*;

mod runner;
pub(crate) use runner::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner,
};

mod test_db;
pub use test_db::TestTransaction;

mod stage;
pub use stage::TestStage;

mod set;
pub use set::TestStages;

/// The test stage id
pub const TEST_STAGE_ID: StageId = StageId("TestStage");

/// The previous test stage id mock used for testing
pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage");
29 changes: 29 additions & 0 deletions crates/stages/src/test_utils/set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::{TestStage, TEST_STAGE_ID};
use crate::{ExecOutput, StageError, StageSet, StageSetBuilder, UnwindOutput};
use reth_db::database::Database;
use std::collections::VecDeque;

#[derive(Default, Debug)]
pub struct TestStages {
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}

impl TestStages {
pub fn new(
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
) -> Self {
Self { exec_outputs, unwind_outputs }
}
}

impl<DB: Database> StageSet<DB> for TestStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default().add_stage(
TestStage::new(TEST_STAGE_ID)
.with_exec(self.exec_outputs)
.with_unwind(self.unwind_outputs),
)
}
}
67 changes: 67 additions & 0 deletions crates/stages/src/test_utils/stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_provider::Transaction;
use std::collections::VecDeque;

#[derive(Debug)]
pub struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}

impl TestStage {
pub fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
}

pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {
self.exec_outputs = exec_outputs;
self
}

pub fn with_unwind(
mut self,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
) -> Self {
self.unwind_outputs = unwind_outputs;
self
}

pub fn add_exec(mut self, output: Result<ExecOutput, StageError>) -> Self {
self.exec_outputs.push_back(output);
self
}

pub fn add_unwind(mut self, output: Result<UnwindOutput, StageError>) -> Self {
self.unwind_outputs.push_back(output);
self
}
}

#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for TestStage {
fn id(&self) -> StageId {
self.id
}

async fn execute(
&mut self,
_: &mut Transaction<'_, DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}

async fn unwind(
&mut self,
_: &mut Transaction<'_, DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))
}
}