diff --git a/core/cli/src/args/run.rs b/core/cli/src/args/run.rs index 49136dd90..1299955e0 100644 --- a/core/cli/src/args/run.rs +++ b/core/cli/src/args/run.rs @@ -2,7 +2,7 @@ use clap::Parser; use common_config_parser::types::Config; use common_version::Version; -use core_run::KeyProvider; +use core_run::{KeyProvider, StopOpt}; use crate::{ error::{Error, Result}, @@ -18,7 +18,11 @@ pub struct RunArgs { value_name = "CONFIG_FILE", help = "File path of client configurations." )] - pub config: Config, + pub config: Config, + #[arg(long = "mine-blocks", help = "Exit after mine N blocks")] + pub mine_blocks: Option, + #[arg(long = "mine-to-height", help = "Exit when reach the height")] + pub mine_to_height: Option, } impl RunArgs { @@ -28,7 +32,17 @@ impl RunArgs { kernel_version: Version, key_provider: Option, ) -> Result<()> { - let Self { config } = self; + let Self { + config, + mine_blocks, + mine_to_height, + } = self; + + let stop_opt = match (mine_blocks, mine_to_height) { + (Some(blocks), None) => Some(StopOpt::MineNBlocks(blocks)), + (None, Some(height)) => Some(StopOpt::MineToHeight(height)), + _ => None, + }; utils::check_version( &config.data_path_for_version(), @@ -38,6 +52,6 @@ impl RunArgs { utils::register_log(&config); let version = application_version.to_string(); - core_run::run(version, config, key_provider).map_err(Error::Running) + core_run::run(version, config, key_provider, stop_opt).map_err(Error::Running) } } diff --git a/core/consensus/src/consensus.rs b/core/consensus/src/consensus.rs index 78997bddf..714847fb7 100644 --- a/core/consensus/src/consensus.rs +++ b/core/consensus/src/consensus.rs @@ -15,6 +15,7 @@ use common_apm::tracing::{AxonTracer, Tag}; use common_apm_derive::trace_span; use common_crypto::PublicKey as _; +use crate::stop_signal::StopSignal; use crate::wal::{ConsensusWal, SignedTxsWAL}; use crate::{ engine::ConsensusEngine, status::StatusAgent, util::OverlordCrypto, ConsensusError, @@ -109,6 +110,7 @@ impl OverlordConsensus { adapter: Arc, lock: Arc>, consensus_wal: Arc, + stop_signal: StopSignal, ) -> Self { let engine = Arc::new(ConsensusEngine::new( status, @@ -118,6 +120,7 @@ impl OverlordConsensus { Arc::clone(&crypto), lock, consensus_wal, + stop_signal, )); let status = engine.status(); let metadata = adapter diff --git a/core/consensus/src/engine.rs b/core/consensus/src/engine.rs index 88f2a6c7d..ef85e1ab6 100644 --- a/core/consensus/src/engine.rs +++ b/core/consensus/src/engine.rs @@ -33,6 +33,7 @@ use crate::message::{ END_GOSSIP_SIGNED_VOTE, }; use crate::status::{CurrentStatus, StatusAgent}; +use crate::stop_signal::StopSignal; use crate::util::{digest_signed_transactions, time_now, OverlordCrypto}; use crate::wal::{ConsensusWal, SignedTxsWAL}; use crate::ConsensusError; @@ -52,6 +53,8 @@ pub struct ConsensusEngine { last_commit_time: RwLock, consensus_wal: Arc, last_check_block_fail_reason: RwLock, + + stop_signal: StopSignal, } #[async_trait] @@ -253,6 +256,14 @@ impl Engine for ConsensusEngine, ) -> Result> { + self.stop_signal + .check_height_and_send(current_number.saturating_sub(1)); + if self.stop_signal.is_stopped() { + return Err( + ProtocolError::from(ConsensusError::Other("node is shutdown".to_string())).into(), + ); + } + let lock = self.lock.try_lock(); if lock.is_err() { return Err(ProtocolError::from(ConsensusError::LockInSync).into()); @@ -509,6 +520,7 @@ impl ConsensusEngine { crypto: Arc, lock: Arc>, consensus_wal: Arc, + stop_signal: StopSignal, ) -> Self { Self { status, @@ -521,6 +533,7 @@ impl ConsensusEngine { last_commit_time: RwLock::new(time_now()), consensus_wal, last_check_block_fail_reason: RwLock::new(String::new()), + stop_signal, } } diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index d9bc78434..429b4fcf6 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -8,6 +8,7 @@ pub mod synchronization; #[cfg(test)] mod tests; +pub mod stop_signal; pub mod types; pub mod util; pub mod wal; diff --git a/core/consensus/src/stop_signal.rs b/core/consensus/src/stop_signal.rs new file mode 100644 index 000000000..6b1d7aef1 --- /dev/null +++ b/core/consensus/src/stop_signal.rs @@ -0,0 +1,47 @@ +use std::sync::RwLock; + +use protocol::tokio::{self}; + +pub enum StopOpt { + MineNBlocks(u64), + MineToHeight(u64), +} + +type SignalSender = tokio::sync::oneshot::Sender<()>; + +pub struct StopSignal { + tx: RwLock>, + stop_at_height: Option, +} + +impl StopSignal { + pub fn new(tx: SignalSender) -> Self { + Self { + tx: RwLock::new(Some(tx)), + stop_at_height: None, + } + } + + pub fn with_stop_at(tx: SignalSender, height: u64) -> Self { + Self { + tx: RwLock::new(Some(tx)), + stop_at_height: Some(height), + } + } + + pub fn check_height_and_send(&self, height: u64) { + if Some(height) == self.stop_at_height { + self.send(); + } + } + + pub fn send(&self) { + if let Some(tx) = self.tx.write().unwrap().take() { + let _ = tx.send(()); + } + } + + pub fn is_stopped(&self) -> bool { + self.tx.read().unwrap().is_none() + } +} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index ad5257955..55c779d40 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -5,6 +5,8 @@ use common_config_parser::types::spec::{ChainSpec, InitialAccount}; use common_config_parser::types::{Config, ConfigMempool}; use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey}; +pub use core_consensus::stop_signal::StopOpt; +use core_consensus::stop_signal::StopSignal; use protocol::tokio::{ self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep, }; @@ -80,6 +82,7 @@ pub fn run( version: String, config: Config, key_provider: Option, + stop_opt: Option, ) -> ProtocolResult<()> { let path_rocksdb = config.data_path_for_rocksdb(); if !path_rocksdb.exists() { @@ -103,7 +106,7 @@ pub fn run( config.executor.triedb_cache_size, )?; log::info!("Start all services."); - start(version, config, key_provider, &db_group).await + start(version, config, key_provider, &db_group, stop_opt).await })?; rt.shutdown_timeout(std::time::Duration::from_secs(1)); @@ -115,6 +118,7 @@ async fn start( config: Config, key_provider: Option, db_group: &DatabaseGroup, + stop_opt: Option, ) -> ProtocolResult<()> { let storage = db_group.storage(); let trie_db = db_group.trie_db(); @@ -140,6 +144,18 @@ async fn start( log::info!("At block number {}", current_block.header.number + 1); + let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>(); + let stop_signal = match stop_opt { + Some(opt) => { + let height = match opt { + StopOpt::MineNBlocks(n) => current_block.header.number + n, + StopOpt::MineToHeight(height) => height, + }; + StopSignal::with_stop_at(stop_tx, height) + } + None => StopSignal::new(stop_tx), + }; + // Init network let mut network_service = init_network_service(&config, current_block.header.chain_id, key_provider)?; @@ -240,6 +256,7 @@ async fn start( Arc::clone(&consensus_adapter), Arc::clone(&lock), Arc::new(ConsensusWal::new(consensus_wal_path)), + stop_signal, ) .await; Arc::new(overlord_consensus) @@ -287,7 +304,10 @@ async fn start( // Run consensus run_overlord_consensus(metadata, validators, current_block, overlord_consensus); - components::system::set_ctrl_c_handle().await; + tokio::select! { + () = components::system::set_ctrl_c_handle() => {} + _ = stop_rx => {} + } components::profiling::stop(); Ok(())