Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 20 additions & 24 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,36 @@
//!
//! The state machine does not do any network requests or validation. Those are
//! handled by an external actor.
use crate::libp2p::hello::HelloRequest;
use crate::message_pool::MessagePool;
use crate::message_pool::MpoolRpcProvider;
use crate::networks::calculate_expected_epoch;
use crate::shim::clock::ChainEpoch;
use crate::state_manager::StateManager;
use crate::utils::misc::env::is_env_truthy;

use super::network_context::SyncNetworkContext;
use crate::{
blocks::{Block, FullTipset, Tipset, TipsetKey},
chain::ChainStore,
chain_sync::{
ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
bad_block_cache::BadBlockCache, metrics, tipset_syncer::validate_tipset,
},
libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
message_pool::{MessagePool, MpoolRpcProvider},
networks::calculate_expected_epoch,
shim::clock::ChainEpoch,
state_manager::StateManager,
utils::misc::env::is_env_truthy,
};
use ahash::{HashMap, HashSet};
use chrono::Utc;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
use libp2p::PeerId;
use parking_lot::Mutex;
use std::time::Instant;
use std::{ops::Deref as _, sync::Arc};
use parking_lot::{Mutex, RwLock};
use std::{ops::Deref as _, sync::Arc, time::Instant};
use tokio::{sync::Notify, task::JoinSet};
use tracing::{debug, error, info, trace, warn};

use super::network_context::SyncNetworkContext;
use crate::chain_sync::sync_status::SyncStatusReport;
use crate::chain_sync::tipset_syncer::validate_tipset;
use crate::chain_sync::{ForkSyncInfo, ForkSyncStage};
use crate::{
blocks::{Block, FullTipset, Tipset, TipsetKey},
chain::ChainStore,
chain_sync::{TipsetValidator, bad_block_cache::BadBlockCache, metrics},
libp2p::{NetworkEvent, PubsubMessage},
};
use parking_lot::RwLock;

pub struct ChainFollower<DB> {
/// Syncing status of the chain
pub sync_status: Arc<RwLock<SyncStatusReport>>,
pub sync_status: SyncStatus,

/// manages retrieving and updates state objects
state_manager: Arc<StateManager<DB>>,
Expand Down Expand Up @@ -138,7 +134,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
tipset_receiver: flume::Receiver<Arc<FullTipset>>,
network: SyncNetworkContext<DB>,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
sync_status: Arc<RwLock<SyncStatusReport>>,
sync_status: SyncStatus,
genesis: Arc<Tipset>,
stateless_mode: bool,
) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ pub use self::{
chain_follower::{ChainFollower, load_full_tipset},
chain_muxer::SyncConfig,
consensus::collect_errs,
sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatusReport},
sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatus, SyncStatusReport},
validation::{TipsetValidationError, TipsetValidator},
};
3 changes: 3 additions & 0 deletions src/chain_sync/sync_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::shim::clock::ChainEpoch;
use crate::state_manager::StateManager;
use chrono::{DateTime, Utc};
use fvm_ipld_blockstore::Blockstore;
use parking_lot::RwLock;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -101,6 +102,8 @@ pub struct ForkSyncInfo {
pub(crate) last_updated: Option<DateTime<Utc>>,
}

pub type SyncStatus = Arc<RwLock<SyncStatusReport>>;

/// Contains information about the current status of the node's synchronization process.
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema)]
pub struct SyncStatusReport {
Expand Down
9 changes: 5 additions & 4 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub mod main;
use crate::blocks::Tipset;
use crate::chain::HeadChange;
use crate::chain::index::ResolveNullTipset;
use crate::chain_sync::ChainFollower;
use crate::chain_sync::network_context::SyncNetworkContext;
use crate::chain_sync::{ChainFollower, SyncStatus};
use crate::cli_shared::snapshot;
use crate::cli_shared::{
chain_path,
Expand Down Expand Up @@ -561,8 +561,9 @@ pub(super) async fn start(
_ = snap_gc_reboot_rx.recv_async() => {
snap_gc.cleanup_before_reboot().await;
}
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx| {
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| {
snap_gc.set_db(ctx.db.clone());
snap_gc.set_sync_status(sync_status);
snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
}) => {
break result
Expand All @@ -576,7 +577,7 @@ pub(super) async fn start_services(
opts: &CliOpts,
mut config: Config,
shutdown_send: mpsc::Sender<()>,
on_app_context_and_db_initialized: impl Fn(&AppContext),
on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
) -> anyhow::Result<()> {
// Cleanup the collector prometheus metrics registry on start
crate::metrics::reset_collector_registry();
Expand Down Expand Up @@ -608,7 +609,7 @@ pub(super) async fn start_services(
services.shutdown().await;
return Ok(());
}
on_app_context_and_db_initialized(&ctx);
on_app_context_and_db_initialized(&ctx, chain_follower.sync_status.clone());
warmup_in_background(&ctx);
ctx.state_manager.populate_cache();
maybe_start_metrics_service(&mut services, &config, &ctx).await?;
Expand Down
32 changes: 24 additions & 8 deletions src/db/gc/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct SnapshotGarbageCollector<DB> {
running: AtomicBool,
blessed_lite_snapshot: RwLock<Option<PathBuf>>,
db: RwLock<Option<Arc<DB>>>,
sync_status: RwLock<Option<crate::chain_sync::SyncStatus>>,
// On mainnet, it takes ~50MiB-200MiB RAM, depending on the time cost of snapshot export
memory_db: RwLock<Option<HashMap<Cid, Vec<u8>>>>,
memory_db_head_key: RwLock<Option<TipsetKey>>,
Expand Down Expand Up @@ -111,6 +112,7 @@ where
running: AtomicBool::new(false),
blessed_lite_snapshot: RwLock::new(None),
db: RwLock::new(None),
sync_status: RwLock::new(None),
memory_db: RwLock::new(None),
memory_db_head_key: RwLock::new(None),
exported_head_key: RwLock::new(None),
Expand All @@ -132,13 +134,18 @@ where
*self.car_db_head_epoch.write() = Some(epoch);
}

pub fn set_sync_status(&self, sync_status: crate::chain_sync::SyncStatus) {
*self.sync_status.write() = Some(sync_status)
}

pub async fn event_loop(&self) {
while self.trigger_rx.recv_async().await.is_ok() {
if self.running.load(Ordering::Relaxed) {
Comment thread
akaladarshi marked this conversation as resolved.
tracing::warn!("snap gc has already been running");
} else {
self.running.store(true, Ordering::Relaxed);
if let Err(e) = self.export_snapshot().await {
self.running.store(false, Ordering::Relaxed);
tracing::warn!("{e}");
}
}
Expand Down Expand Up @@ -170,18 +177,22 @@ where
);
loop {
if !self.running.load(Ordering::Relaxed)
&& let Some(db) = &*self.db.read()
&& let Some(car_db_head_epoch) = *self.car_db_head_epoch.read()
&& let Ok(head_key) = HeaviestTipsetKeyProvider::heaviest_tipset_key(db)
&& let Ok(head) = Tipset::load_required(db, &head_key)
&& let Some(sync_status) = &*self.sync_status.read()
{
let head_epoch = head.epoch();
if head_epoch - car_db_head_epoch >= snap_gc_interval_epochs
let sync_status = &*sync_status.read();
let network_head_epoch = sync_status.network_head_epoch;
let head_epoch = sync_status.current_head_epoch;
if head_epoch > 0 // sync_status has been initialized
&& head_epoch <= network_head_epoch // head epoch is within a sane range
&& sync_status.is_synced() // chain is in sync
&& sync_status.active_forks.is_empty() // no active fork
&& head_epoch - car_db_head_epoch >= snap_gc_interval_epochs // the gap between chain head and car_db head is above threshold
&& self.trigger_tx.try_send(()).is_ok()
{
tracing::info!(%car_db_head_epoch, %head_epoch, %snap_gc_interval_epochs, "Snap GC scheduled");
tracing::info!(%car_db_head_epoch, %head_epoch, %network_head_epoch, %snap_gc_interval_epochs, "Snap GC scheduled");
} else {
tracing::trace!(%car_db_head_epoch, %head_epoch, %snap_gc_interval_epochs, "Snap GC not scheduled");
tracing::debug!(%car_db_head_epoch, %head_epoch, %network_head_epoch, %snap_gc_interval_epochs, "Snap GC not scheduled");
}
}
tokio::time::sleep(snap_gc_check_interval).await;
Expand Down Expand Up @@ -219,6 +230,7 @@ where
}
map
});
let start = Instant::now();
let (head_ts, _) = crate::chain::export_from_head::<Sha256>(
&db,
self.recent_state_roots,
Expand All @@ -235,7 +247,11 @@ where
head_ts.epoch()
));
temp_path.persist(&target_path)?;
tracing::info!("exported lite snapshot at {}", target_path.display());
tracing::info!(
"exported lite snapshot at {}, took {}",
target_path.display(),
humantime::format_duration(start.elapsed())
);
*self.blessed_lite_snapshot.write() = Some(target_path);
*self.exported_head_key.write() = Some(head_ts.key().clone());

Expand Down
12 changes: 5 additions & 7 deletions src/health/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use axum::{
response::{IntoResponse, Response},
routing::get,
};
use parking_lot::RwLock;

use crate::chain_sync::SyncStatusReport;
use crate::{Config, libp2p::PeerManager, networks::ChainConfig};
use crate::{Config, chain_sync::SyncStatus, libp2p::PeerManager, networks::ChainConfig};

mod endpoints;

Expand All @@ -22,7 +20,7 @@ pub(crate) struct ForestState {
pub config: Config,
pub chain_config: Arc<ChainConfig>,
pub genesis_timestamp: u64,
pub sync_status: Arc<RwLock<SyncStatusReport>>,
pub sync_status: SyncStatus,
pub peer_manager: Arc<PeerManager>,
}

Expand Down Expand Up @@ -60,11 +58,11 @@ impl IntoResponse for AppError {
mod test {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use super::*;
use crate::Client;
use crate::chain_sync::{NodeSyncStatus, SyncStatusReport};
use crate::cli_shared::cli::ChainIndexerConfig;

use super::*;
use crate::chain_sync::NodeSyncStatus;
use parking_lot::RwLock;
use reqwest::StatusCode;

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ pub struct RPCState<DB> {
pub mpool: Arc<crate::message_pool::MessagePool<crate::message_pool::MpoolRpcProvider<DB>>>,
pub bad_blocks: Option<Arc<crate::chain_sync::BadBlockCache>>,
pub msgs_in_tipset: Arc<crate::chain::store::MsgsInTipsetCache>,
pub sync_status: Arc<parking_lot::RwLock<crate::chain_sync::SyncStatusReport>>,
pub sync_status: crate::chain_sync::SyncStatus,
pub eth_event_handler: Arc<EthEventHandler>,
pub sync_network_context: SyncNetworkContext<DB>,
pub tipset_send: flume::Sender<Arc<FullTipset>>,
Expand Down
Loading