Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 5 additions & 61 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::{error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use metrics::Gauge;
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput};
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_metrics_derive::Metrics;
use reth_primitives::{BlockNumber, H256};
use reth_provider::Transaction;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
ops::Deref,
sync::Arc,
Expand All @@ -18,12 +15,16 @@ use tracing::*;
mod builder;
mod ctrl;
mod event;
mod progress;
mod set;
mod sync_metrics;

pub use builder::*;
use ctrl::*;
pub use event::*;
use progress::*;
pub use set::*;
use sync_metrics::*;

#[cfg_attr(doc, aquamarine::aquamarine)]
/// A staged sync pipeline.
Expand Down Expand Up @@ -368,63 +369,6 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
}
}

#[derive(Metrics)]
#[metrics(scope = "sync")]
struct StageMetrics {
/// The block number of the last commit for a stage.
checkpoint: Gauge,
}

#[derive(Default)]
struct Metrics {
checkpoints: HashMap<StageId, StageMetrics>,
}

impl Metrics {
fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) {
self.checkpoints
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
.checkpoint
.set(progress as f64);
}
}

#[derive(Debug, Default)]
struct PipelineProgress {
/// The progress of the current stage
pub(crate) progress: Option<BlockNumber>,
/// The maximum progress achieved by any stage during the execution of the pipeline.
pub(crate) maximum_progress: Option<BlockNumber>,
/// The minimum progress achieved by any stage during the execution of the pipeline.
pub(crate) minimum_progress: Option<BlockNumber>,
}

impl PipelineProgress {
fn update(&mut self, progress: BlockNumber) {
self.progress = Some(progress);
self.minimum_progress = opt::min(self.minimum_progress, progress);
self.maximum_progress = opt::max(self.maximum_progress, progress);
}

/// Create a sync state from pipeline progress.
fn current_sync_state(&self, downloading: bool) -> SyncState {
match self.progress {
Some(progress) if downloading => SyncState::Downloading { target_block: progress },
Some(progress) => SyncState::Executing { target_block: progress },
None => SyncState::Idle,
}
}

/// Get next control flow step
fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress { stage_progress: None },
}
}
}

/// A container for a queued stage.
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;

Expand Down
39 changes: 39 additions & 0 deletions crates/stages/src/pipeline/progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use super::ctrl::ControlFlow;
use crate::util::opt;
use reth_interfaces::sync::SyncState;
use reth_primitives::BlockNumber;

#[derive(Debug, Default)]
pub(crate) struct PipelineProgress {
/// The progress of the current stage
pub(crate) progress: Option<BlockNumber>,
/// The maximum progress achieved by any stage during the execution of the pipeline.
pub(crate) maximum_progress: Option<BlockNumber>,
/// The minimum progress achieved by any stage during the execution of the pipeline.
pub(crate) minimum_progress: Option<BlockNumber>,
}

impl PipelineProgress {
pub(crate) fn update(&mut self, progress: BlockNumber) {
self.progress = Some(progress);
self.minimum_progress = opt::min(self.minimum_progress, progress);
self.maximum_progress = opt::max(self.maximum_progress, progress);
}

/// Create a sync state from pipeline progress.
pub(crate) fn current_sync_state(&self, downloading: bool) -> SyncState {
match self.progress {
Some(progress) if downloading => SyncState::Downloading { target_block: progress },
Some(progress) => SyncState::Executing { target_block: progress },
None => SyncState::Idle,
}
}

/// Get next control flow step
pub(crate) fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress { stage_progress: None },
}
}
}
26 changes: 26 additions & 0 deletions crates/stages/src/pipeline/sync_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::StageId;
use metrics::Gauge;
use reth_metrics_derive::Metrics;
use std::collections::HashMap;

#[derive(Metrics)]
#[metrics(scope = "sync")]
pub(crate) struct StageMetrics {
/// The block number of the last commit for a stage.
checkpoint: Gauge,
}

#[derive(Default)]
pub(crate) struct Metrics {
checkpoints: HashMap<StageId, StageMetrics>,
}

impl Metrics {
pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) {
self.checkpoints
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
.checkpoint
.set(progress as f64);
}
}