Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Command {

let db = Arc::new(init_db(db_path)?);
let factory = ProviderFactory::new(&db, self.chain.clone());
let mut provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
let provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;

let execution_checkpoint_block =
provider_rw.get_stage_checkpoint(StageId::Execution)?.unwrap_or_default().block_number;
Expand Down Expand Up @@ -110,7 +110,7 @@ impl Command {

execution_stage
.execute(
&mut provider_rw,
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
Expand All @@ -122,7 +122,7 @@ impl Command {
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&mut provider_rw,
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
Expand All @@ -136,7 +136,7 @@ impl Command {
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&mut provider_rw,
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
Expand All @@ -148,7 +148,7 @@ impl Command {

let incremental_result = merkle_stage
.execute(
&mut provider_rw,
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
Expand All @@ -171,7 +171,7 @@ impl Command {

let clean_input = ExecInput { target: Some(block), checkpoint: None };
loop {
let clean_result = merkle_stage.execute(&mut provider_rw, clean_input).await;
let clean_result = merkle_stage.execute(&provider_rw, clean_input).await;
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
break
Expand Down
8 changes: 4 additions & 4 deletions bin/reth/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;

let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(db_tool.chain.clone()));

exec_stage
.unwind(
&mut provider,
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
Expand Down Expand Up @@ -130,12 +130,12 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage. [dry-run]");

let factory = ProviderFactory::new(&output_db, chain.clone());
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone()));

exec_stage
.execute(
&mut provider,
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
Expand Down
8 changes: 4 additions & 4 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

exec_stage
.unwind(
&mut provider,
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
Expand All @@ -69,7 +69,7 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage.");

let factory = ProviderFactory::new(&output_db, chain);
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
Expand All @@ -79,7 +79,7 @@ async fn dry_run<DB: Database>(
while !exec_output {
exec_output = exec_stage
.execute(
&mut provider,
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
Expand Down
8 changes: 4 additions & 4 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;

let mut exec_stage = StorageHashingStage::default();

exec_stage
.unwind(
&mut provider,
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
Expand Down Expand Up @@ -68,7 +68,7 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage.");

let factory = ProviderFactory::new(&output_db, chain);
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;
let mut exec_stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
Expand All @@ -78,7 +78,7 @@ async fn dry_run<DB: Database>(
while !exec_output {
exec_output = exec_stage
.execute(
&mut provider,
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
Expand Down
18 changes: 9 additions & 9 deletions bin/reth/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn unwind_and_copy<DB: Database>(
) -> eyre::Result<()> {
let (from, to) = range;
let factory = ProviderFactory::new(db_tool.db, db_tool.chain.clone());
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;

let unwind = UnwindInput {
unwind_to: from,
Expand All @@ -61,10 +61,10 @@ async fn unwind_and_copy<DB: Database>(

// Unwind hashes all the way to FROM

StorageHashingStage::default().unwind(&mut provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut provider, unwind).await.unwrap();
StorageHashingStage::default().unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&provider, unwind).await.unwrap();

MerkleStage::default_unwind().unwind(&mut provider, unwind).await?;
MerkleStage::default_unwind().unwind(&provider, unwind).await?;

// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new(
Expand All @@ -74,7 +74,7 @@ async fn unwind_and_copy<DB: Database>(

exec_stage
.unwind(
&mut provider,
&provider,
UnwindInput {
unwind_to: to,
checkpoint: StageCheckpoint::new(tip_block_number),
Expand All @@ -86,11 +86,11 @@ async fn unwind_and_copy<DB: Database>(
// Bring hashes to TO

AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut provider, execute_input)
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut provider, execute_input)
.execute(&provider, execute_input)
.await
.unwrap();

Expand All @@ -116,7 +116,7 @@ async fn dry_run<DB: Database>(
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let factory = ProviderFactory::new(&output_db, chain);
let mut provider = factory.provider_rw()?;
let provider = factory.provider_rw()?;
let mut exec_output = false;
while !exec_output {
exec_output = MerkleStage::Execution {
Expand All @@ -125,7 +125,7 @@ async fn dry_run<DB: Database>(
* scratch */
}
.execute(
&mut provider,
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Command {

if !self.skip_unwind {
while unwind.checkpoint.block_number > self.from {
let unwind_output = unwind_stage.unwind(&mut provider_rw, unwind).await?;
let unwind_output = unwind_stage.unwind(&provider_rw, unwind).await?;
unwind.checkpoint = unwind_output.checkpoint;

if self.commit {
Expand All @@ -245,7 +245,7 @@ impl Command {
};

while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut provider_rw, input).await?
exec_stage.execute(&provider_rw, input).await?
{
input.checkpoint = Some(stage_progress);

Expand Down
4 changes: 2 additions & 2 deletions crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ fn measure_stage_with_path<F, S>(
|_| async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
stage.execute(&mut provider, input).await.unwrap();
let provider = factory.provider_rw().unwrap();
stage.execute(&provider, input).await.unwrap();
provider.commit().unwrap();
},
)
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/benches/setup/account_hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
std::fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display());
let tx = TestTransaction::new(&path);
let mut provider = tx.inner_rw();
let _accounts = AccountHashingStage::seed(&mut provider, opts);
let provider = tx.inner_rw();
let _accounts = AccountHashingStage::seed(&provider, opts);
provider.commit().expect("failed to commit");
}
(path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default()))
Expand Down
16 changes: 8 additions & 8 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ pub(crate) fn stage_unwind<S: Clone + Stage<Env<WriteMap>>>(
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();

// Clear previous run
stage
.unwind(&mut provider, unwind)
.unwind(&provider, unwind)
.await
.map_err(|e| {
format!(
Expand All @@ -70,16 +70,16 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<Env<WriteMap>>>(
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();

StorageHashingStage::default().unwind(&mut provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut provider, unwind).await.unwrap();
StorageHashingStage::default().unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&provider, unwind).await.unwrap();

// Clear previous run
stage.unwind(&mut provider, unwind).await.unwrap();
stage.unwind(&provider, unwind).await.unwrap();

AccountHashingStage::default().execute(&mut provider, input).await.unwrap();
StorageHashingStage::default().execute(&mut provider, input).await.unwrap();
AccountHashingStage::default().execute(&provider, input).await.unwrap();
StorageHashingStage::default().execute(&provider, input).await.unwrap();

provider.commit().unwrap();
});
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ where
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.listeners.notify(PipelineEvent::Unwinding { stage_id, input });

let output = stage.unwind(&mut provider_rw, input).await;
let output = stage.unwind(&provider_rw, input).await;
match output {
Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint;
Expand Down Expand Up @@ -358,7 +358,7 @@ where
});

match stage
.execute(&mut provider_rw, ExecInput { target, checkpoint: prev_checkpoint })
.execute(&provider_rw, ExecInput { target, checkpoint: prev_checkpoint })
.await
{
Ok(out @ ExecOutput { checkpoint, done }) => {
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,14 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Execute the stage.
async fn execute(
&mut self,
provider: &mut DatabaseProviderRW<'_, &DB>,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;

/// Unwind the stage.
async fn unwind(
&mut self,
provider: &mut DatabaseProviderRW<'_, &DB>,
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError>;
}
4 changes: 2 additions & 2 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// header, limited by the stage's batch size.
async fn execute(
&mut self,
provider: &mut DatabaseProviderRW<'_, &DB>,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
/// Unwind the stage.
async fn unwind(
&mut self,
provider: &mut DatabaseProviderRW<'_, &DB>,
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
Expand Down
Loading