Skip to content

Commit

Permalink
feat: support stop at specific height (#1581)
Browse files Browse the repository at this point in the history
* feat: support stop at specific height

* fix: fix fmt and clippy warnings
  • Loading branch information
jjyr authored Nov 21, 2023
1 parent 7610890 commit f692b86
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 6 deletions.
22 changes: 18 additions & 4 deletions core/cli/src/args/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;

use common_config_parser::types::Config;
use common_version::Version;
use core_run::KeyProvider;
use core_run::{KeyProvider, StopOpt};

use crate::{
error::{Error, Result},
Expand All @@ -18,7 +18,11 @@ pub struct RunArgs {
value_name = "CONFIG_FILE",
help = "File path of client configurations."
)]
pub config: Config,
pub config: Config,
#[arg(long = "mine-blocks", help = "Exit after mine N blocks")]
pub mine_blocks: Option<u64>,
#[arg(long = "mine-to-height", help = "Exit when reach the height")]
pub mine_to_height: Option<u64>,
}

impl RunArgs {
Expand All @@ -28,7 +32,17 @@ impl RunArgs {
kernel_version: Version,
key_provider: Option<K>,
) -> Result<()> {
let Self { config } = self;
let Self {
config,
mine_blocks,
mine_to_height,
} = self;

let stop_opt = match (mine_blocks, mine_to_height) {
(Some(blocks), None) => Some(StopOpt::MineNBlocks(blocks)),
(None, Some(height)) => Some(StopOpt::MineToHeight(height)),
_ => None,
};

utils::check_version(
&config.data_path_for_version(),
Expand All @@ -38,6 +52,6 @@ impl RunArgs {
utils::register_log(&config);

let version = application_version.to_string();
core_run::run(version, config, key_provider).map_err(Error::Running)
core_run::run(version, config, key_provider, stop_opt).map_err(Error::Running)
}
}
3 changes: 3 additions & 0 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use common_apm::tracing::{AxonTracer, Tag};
use common_apm_derive::trace_span;
use common_crypto::PublicKey as _;

use crate::stop_signal::StopSignal;
use crate::wal::{ConsensusWal, SignedTxsWAL};
use crate::{
engine::ConsensusEngine, status::StatusAgent, util::OverlordCrypto, ConsensusError,
Expand Down Expand Up @@ -109,6 +110,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
adapter: Arc<Adapter>,
lock: Arc<AsyncMutex<()>>,
consensus_wal: Arc<ConsensusWal>,
stop_signal: StopSignal,
) -> Self {
let engine = Arc::new(ConsensusEngine::new(
status,
Expand All @@ -118,6 +120,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
Arc::clone(&crypto),
lock,
consensus_wal,
stop_signal,
));
let status = engine.status();
let metadata = adapter
Expand Down
13 changes: 13 additions & 0 deletions core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::message::{
END_GOSSIP_SIGNED_VOTE,
};
use crate::status::{CurrentStatus, StatusAgent};
use crate::stop_signal::StopSignal;
use crate::util::{digest_signed_transactions, time_now, OverlordCrypto};
use crate::wal::{ConsensusWal, SignedTxsWAL};
use crate::ConsensusError;
Expand All @@ -52,6 +53,8 @@ pub struct ConsensusEngine<Adapter> {
last_commit_time: RwLock<u64>,
consensus_wal: Arc<ConsensusWal>,
last_check_block_fail_reason: RwLock<String>,

stop_signal: StopSignal,
}

#[async_trait]
Expand Down Expand Up @@ -253,6 +256,14 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<Proposal> for ConsensusEngine<A
current_number: u64,
commit: Commit<Proposal>,
) -> Result<Status, Box<dyn Error + Send>> {
self.stop_signal
.check_height_and_send(current_number.saturating_sub(1));
if self.stop_signal.is_stopped() {
return Err(
ProtocolError::from(ConsensusError::Other("node is shutdown".to_string())).into(),
);
}

let lock = self.lock.try_lock();
if lock.is_err() {
return Err(ProtocolError::from(ConsensusError::LockInSync).into());
Expand Down Expand Up @@ -509,6 +520,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
crypto: Arc<OverlordCrypto>,
lock: Arc<AsyncMutex<()>>,
consensus_wal: Arc<ConsensusWal>,
stop_signal: StopSignal,
) -> Self {
Self {
status,
Expand All @@ -521,6 +533,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
last_commit_time: RwLock::new(time_now()),
consensus_wal,
last_check_block_fail_reason: RwLock::new(String::new()),
stop_signal,
}
}

Expand Down
1 change: 1 addition & 0 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod synchronization;
#[cfg(test)]
mod tests;

pub mod stop_signal;
pub mod types;
pub mod util;
pub mod wal;
Expand Down
47 changes: 47 additions & 0 deletions core/consensus/src/stop_signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::sync::RwLock;

use protocol::tokio::{self};

pub enum StopOpt {
MineNBlocks(u64),
MineToHeight(u64),
}

type SignalSender = tokio::sync::oneshot::Sender<()>;

pub struct StopSignal {
tx: RwLock<Option<SignalSender>>,
stop_at_height: Option<u64>,
}

impl StopSignal {
pub fn new(tx: SignalSender) -> Self {
Self {
tx: RwLock::new(Some(tx)),
stop_at_height: None,
}
}

pub fn with_stop_at(tx: SignalSender, height: u64) -> Self {
Self {
tx: RwLock::new(Some(tx)),
stop_at_height: Some(height),
}
}

pub fn check_height_and_send(&self, height: u64) {
if Some(height) == self.stop_at_height {
self.send();
}
}

pub fn send(&self) {
if let Some(tx) = self.tx.write().unwrap().take() {
let _ = tx.send(());
}
}

pub fn is_stopped(&self) -> bool {
self.tx.read().unwrap().is_none()
}
}
24 changes: 22 additions & 2 deletions core/run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use common_config_parser::types::spec::{ChainSpec, InitialAccount};
use common_config_parser::types::{Config, ConfigMempool};
use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey};

pub use core_consensus::stop_signal::StopOpt;
use core_consensus::stop_signal::StopSignal;
use protocol::tokio::{
self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep,
};
Expand Down Expand Up @@ -80,6 +82,7 @@ pub fn run<K: KeyProvider>(
version: String,
config: Config,
key_provider: Option<K>,
stop_opt: Option<StopOpt>,
) -> ProtocolResult<()> {
let path_rocksdb = config.data_path_for_rocksdb();
if !path_rocksdb.exists() {
Expand All @@ -103,7 +106,7 @@ pub fn run<K: KeyProvider>(
config.executor.triedb_cache_size,
)?;
log::info!("Start all services.");
start(version, config, key_provider, &db_group).await
start(version, config, key_provider, &db_group, stop_opt).await
})?;
rt.shutdown_timeout(std::time::Duration::from_secs(1));

Expand All @@ -115,6 +118,7 @@ async fn start<K: KeyProvider>(
config: Config,
key_provider: Option<K>,
db_group: &DatabaseGroup,
stop_opt: Option<StopOpt>,
) -> ProtocolResult<()> {
let storage = db_group.storage();
let trie_db = db_group.trie_db();
Expand All @@ -140,6 +144,18 @@ async fn start<K: KeyProvider>(

log::info!("At block number {}", current_block.header.number + 1);

let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
let stop_signal = match stop_opt {
Some(opt) => {
let height = match opt {
StopOpt::MineNBlocks(n) => current_block.header.number + n,
StopOpt::MineToHeight(height) => height,
};
StopSignal::with_stop_at(stop_tx, height)
}
None => StopSignal::new(stop_tx),
};

// Init network
let mut network_service =
init_network_service(&config, current_block.header.chain_id, key_provider)?;
Expand Down Expand Up @@ -240,6 +256,7 @@ async fn start<K: KeyProvider>(
Arc::clone(&consensus_adapter),
Arc::clone(&lock),
Arc::new(ConsensusWal::new(consensus_wal_path)),
stop_signal,
)
.await;
Arc::new(overlord_consensus)
Expand Down Expand Up @@ -287,7 +304,10 @@ async fn start<K: KeyProvider>(
// Run consensus
run_overlord_consensus(metadata, validators, current_block, overlord_consensus);

components::system::set_ctrl_c_handle().await;
tokio::select! {
() = components::system::set_ctrl_c_handle() => {}
_ = stop_rx => {}
}
components::profiling::stop();

Ok(())
Expand Down

0 comments on commit f692b86

Please sign in to comment.