From 1d3713dc3f5aba3c705a3fdca941b80d11682944 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Tue, 29 Aug 2023 21:41:35 +0800 Subject: [PATCH 1/8] refactor(core-run): split error into a single file --- core/run/src/error.rs | 33 ++++++++++++++++++++++++++++++ core/run/src/lib.rs | 47 ++++++++----------------------------------- 2 files changed, 41 insertions(+), 39 deletions(-) create mode 100644 core/run/src/error.rs diff --git a/core/run/src/error.rs b/core/run/src/error.rs new file mode 100644 index 000000000..60ae398fc --- /dev/null +++ b/core/run/src/error.rs @@ -0,0 +1,33 @@ +use protocol::{Display, From, ProtocolError, ProtocolErrorKind}; + +#[derive(Debug, Display, From)] +pub enum MainError { + #[display(fmt = "The axon configuration read failed {:?}", _0)] + ConfigParse(common_config_parser::ParseError), + + #[display(fmt = "{:?}", _0)] + Io(std::io::Error), + + #[display(fmt = "Toml fails to parse genesis {:?}", _0)] + GenesisTomlDe(toml::de::Error), + + #[display(fmt = "crypto error {:?}", _0)] + Crypto(common_crypto::Error), + + #[display(fmt = "{:?}", _0)] + Utf8(std::string::FromUtf8Error), + + #[display(fmt = "{:?}", _0)] + JSONParse(serde_json::error::Error), + + #[display(fmt = "other error {:?}", _0)] + Other(String), +} + +impl std::error::Error for MainError {} + +impl From for ProtocolError { + fn from(error: MainError) -> ProtocolError { + ProtocolError::new(ProtocolErrorKind::Main, Box::new(error)) + } +} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index db7ed3c80..e717a81d7 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -34,10 +34,7 @@ use protocol::types::{ Account, Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, Validator, ValidatorExtend, H256, NIL_DATA, RLP_NULL, }; -use protocol::{ - async_trait, lazy::CHAIN_ID, trie::DB as TrieDB, Display, From, ProtocolError, - ProtocolErrorKind, ProtocolResult, -}; +use protocol::{async_trait, lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; use core_api::{jsonrpc::run_jsonrpc_server, DefaultAPIAdapter}; use core_consensus::message::{ @@ -78,6 +75,13 @@ pub use core_network::{KeyProvider, SecioKeyPair}; #[global_allocator] pub static JEMALLOC: Jemalloc = Jemalloc; +mod error; + +#[cfg(test)] +mod tests; + +pub use error::MainError; + #[derive(Debug)] pub struct Axon { version: String, @@ -87,9 +91,6 @@ pub struct Axon { state_root: MerkleRoot, } -#[cfg(test)] -mod tests; - impl Axon { pub fn new(version: String, config: Config, spec: ChainSpec, genesis: RichBlock) -> Axon { Axon { @@ -833,38 +834,6 @@ impl Axon { } } -#[derive(Debug, Display, From)] -pub enum MainError { - #[display(fmt = "The axon configuration read failed {:?}", _0)] - ConfigParse(common_config_parser::ParseError), - - #[display(fmt = "{:?}", _0)] - Io(std::io::Error), - - #[display(fmt = "Toml fails to parse genesis {:?}", _0)] - GenesisTomlDe(toml::de::Error), - - #[display(fmt = "crypto error {:?}", _0)] - Crypto(common_crypto::Error), - - #[display(fmt = "{:?}", _0)] - Utf8(std::string::FromUtf8Error), - - #[display(fmt = "{:?}", _0)] - JSONParse(serde_json::error::Error), - - #[display(fmt = "other error {:?}", _0)] - Other(String), -} - -impl std::error::Error for MainError {} - -impl From for ProtocolError { - fn from(error: MainError) -> ProtocolError { - ProtocolError::new(ProtocolErrorKind::Main, Box::new(error)) - } -} - #[derive(Clone)] enum KeyP { Custom(K), From 33b04d35fb15360dd8eed0932a1f9a235c20d016 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Sat, 26 Aug 2023 17:09:49 +0800 Subject: [PATCH 2/8] refactor(core-run): split key-provider into a single file --- core/run/src/key_provider.rs | 57 ++++++++++++++++++++++++++++++++++ core/run/src/lib.rs | 59 +++--------------------------------- 2 files changed, 61 insertions(+), 55 deletions(-) create mode 100644 core/run/src/key_provider.rs diff --git a/core/run/src/key_provider.rs b/core/run/src/key_provider.rs new file mode 100644 index 000000000..2e0f06297 --- /dev/null +++ b/core/run/src/key_provider.rs @@ -0,0 +1,57 @@ +use core_network::{KeyProvider, SecioError, SecioKeyPair}; + +use protocol::async_trait; + +#[derive(Clone)] +pub(crate) enum KeyP { + Custom(K), + Default(SecioKeyPair), +} + +#[async_trait] +impl KeyProvider for KeyP +where + K: KeyProvider, +{ + type Error = SecioError; + + async fn sign_ecdsa_async + Send>( + &self, + message: T, + ) -> Result, Self::Error> { + match self { + KeyP::Custom(k) => k.sign_ecdsa_async(message).await.map_err(Into::into), + KeyP::Default(k) => k.sign_ecdsa_async(message).await, + } + } + + /// Constructs a signature for `msg` using the secret key `sk` + fn sign_ecdsa>(&self, message: T) -> Result, Self::Error> { + match self { + KeyP::Custom(k) => k.sign_ecdsa(message).map_err(Into::into), + KeyP::Default(k) => k.sign_ecdsa(message), + } + } + + /// Creates a new public key from the [`KeyProvider`]. + fn pubkey(&self) -> Vec { + match self { + KeyP::Custom(k) => k.pubkey(), + KeyP::Default(k) => k.pubkey(), + } + } + + /// Checks that `sig` is a valid ECDSA signature for `msg` using the + /// pubkey. + fn verify_ecdsa(&self, pubkey: P, message: T, signature: F) -> bool + where + P: AsRef<[u8]>, + T: AsRef<[u8]>, + F: AsRef<[u8]>, + { + match self { + KeyP::Custom(k) => k.verify_ecdsa(pubkey, message, signature), + KeyP::Default(k) => k.verify_ecdsa(pubkey, message, signature), + } + } +} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index e717a81d7..3a9172e39 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -34,7 +34,7 @@ use protocol::types::{ Account, Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, Validator, ValidatorExtend, H256, NIL_DATA, RLP_NULL, }; -use protocol::{async_trait, lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; +use protocol::{lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; use core_api::{jsonrpc::run_jsonrpc_server, DefaultAPIAdapter}; use core_consensus::message::{ @@ -61,7 +61,7 @@ use core_mempool::{ RPC_PULL_TXS, RPC_RESP_PULL_TXS, RPC_RESP_PULL_TXS_SYNC, }; use core_network::{ - observe_listen_port_occupancy, NetworkConfig, NetworkService, PeerId, PeerIdExt, SecioError, + observe_listen_port_occupancy, NetworkConfig, NetworkService, PeerId, PeerIdExt, }; use core_storage::ImplStorage; @@ -76,11 +76,13 @@ pub use core_network::{KeyProvider, SecioKeyPair}; pub static JEMALLOC: Jemalloc = Jemalloc; mod error; +mod key_provider; #[cfg(test)] mod tests; pub use error::MainError; +use key_provider::KeyP; #[derive(Debug)] pub struct Axon { @@ -834,59 +836,6 @@ impl Axon { } } -#[derive(Clone)] -enum KeyP { - Custom(K), - Default(SecioKeyPair), -} -#[async_trait] -impl KeyProvider for KeyP -where - K: KeyProvider, -{ - type Error = SecioError; - - async fn sign_ecdsa_async + Send>( - &self, - message: T, - ) -> Result, Self::Error> { - match self { - KeyP::Custom(k) => k.sign_ecdsa_async(message).await.map_err(Into::into), - KeyP::Default(k) => k.sign_ecdsa_async(message).await, - } - } - - /// Constructs a signature for `msg` using the secret key `sk` - fn sign_ecdsa>(&self, message: T) -> Result, Self::Error> { - match self { - KeyP::Custom(k) => k.sign_ecdsa(message).map_err(Into::into), - KeyP::Default(k) => k.sign_ecdsa(message), - } - } - - /// Creates a new public key from the [`KeyProvider`]. - fn pubkey(&self) -> Vec { - match self { - KeyP::Custom(k) => k.pubkey(), - KeyP::Default(k) => k.pubkey(), - } - } - - /// Checks that `sig` is a valid ECDSA signature for `msg` using the - /// pubkey. - fn verify_ecdsa(&self, pubkey: P, message: T, signature: F) -> bool - where - P: AsRef<[u8]>, - T: AsRef<[u8]>, - F: AsRef<[u8]>, - { - match self { - KeyP::Custom(k) => k.verify_ecdsa(pubkey, message, signature), - KeyP::Default(k) => k.verify_ecdsa(pubkey, message, signature), - } - } -} - async fn init_storage>( config: &ConfigRocksDB, rocksdb_path: P, From a65102611e62bd353e3ef4f49fd75495fa5b736c Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Sat, 26 Aug 2023 18:20:23 +0800 Subject: [PATCH 3/8] refactor(core-run): split profiling into a single file --- common/memory-tracker/src/lib.rs | 2 +- core/run/src/components/mod.rs | 28 ++++++++++ core/run/src/components/profiling.rs | 47 ++++++++++++++++ core/run/src/lib.rs | 80 +++------------------------- 4 files changed, 82 insertions(+), 75 deletions(-) create mode 100644 core/run/src/components/mod.rs create mode 100644 core/run/src/components/profiling.rs diff --git a/common/memory-tracker/src/lib.rs b/common/memory-tracker/src/lib.rs index 23589d9b4..4fff44f22 100644 --- a/common/memory-tracker/src/lib.rs +++ b/common/memory-tracker/src/lib.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration}; use jemalloc_ctl::{epoch, stats}; use log::error; -use rocksdb::ops::{GetColumnFamilys, GetProperty, GetPropertyCF}; +pub use rocksdb::ops::{GetColumnFamilys, GetProperty, GetPropertyCF}; use protocol::tokio; diff --git a/core/run/src/components/mod.rs b/core/run/src/components/mod.rs new file mode 100644 index 000000000..652b382d4 --- /dev/null +++ b/core/run/src/components/mod.rs @@ -0,0 +1,28 @@ +#[cfg(all( + not(target_env = "msvc"), + not(target_os = "macos"), + feature = "jemalloc" +))] +pub(crate) mod profiling; + +#[cfg(not(all( + not(target_env = "msvc"), + not(target_os = "macos"), + feature = "jemalloc" +)))] +pub(crate) mod profiling { + use std::sync::Arc; + + pub(crate) fn start() { + log::warn!("profiling is not supported, so it doesn't start"); + } + pub(crate) fn stop() { + log::warn!("profiling is not supported, so it doesn't require stopping"); + } + pub(crate) fn track_current_process() { + log::warn!("profiling is not supported, so it doesn't track current process"); + } + pub(crate) fn track_db_process(typ: &str, _db: &Arc) { + log::warn!("profiling is not supported, so it doesn't track db process for [{typ}]"); + } +} diff --git a/core/run/src/components/profiling.rs b/core/run/src/components/profiling.rs new file mode 100644 index 000000000..5c4eb4dec --- /dev/null +++ b/core/run/src/components/profiling.rs @@ -0,0 +1,47 @@ +//! Control the profiling related features. + +use std::sync::Arc; + +use common_memory_tracker::{GetColumnFamilys, GetProperty, GetPropertyCF}; +use jemalloc_ctl::{Access, AsName}; +use jemallocator::Jemalloc; +use protocol::tokio; + +#[global_allocator] +pub static JEMALLOC: Jemalloc = Jemalloc; + +pub(crate) fn start() { + set_profile(true); +} + +pub(crate) fn stop() { + set_profile(false); + dump_profile(); +} + +pub(crate) fn track_current_process() { + tokio::spawn(common_memory_tracker::track_current_process()); +} + +pub(crate) fn track_db_process(typ: &'static str, db_ref: &Arc) +where + DB: GetColumnFamilys + GetProperty + GetPropertyCF + Send + Sync + 'static, +{ + let db = Arc::clone(db_ref); + tokio::spawn(common_memory_tracker::track_db_process::(typ, db)); +} + +fn set_profile(is_active: bool) { + let _ = b"prof.active\0" + .name() + .write(is_active) + .map_err(|e| panic!("Set jemalloc profile error {:?}", e)); +} + +fn dump_profile() { + let name = b"profile.out\0".as_ref(); + b"prof.dump\0" + .name() + .write(name) + .expect("Should succeed to dump profile") +} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index 3a9172e39..a2cddb8ae 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -3,15 +3,6 @@ use std::{collections::HashMap, panic::PanicInfo, path::Path, sync::Arc, time::Duration}; use backtrace::Backtrace; -#[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" -))] -use { - jemalloc_ctl::{Access, AsName}, - jemallocator::Jemalloc, -}; use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; use common_apm::{server::run_prometheus_server, tracing::global_tracer_register}; @@ -67,14 +58,7 @@ use core_storage::ImplStorage; pub use core_network::{KeyProvider, SecioKeyPair}; -#[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" -))] -#[global_allocator] -pub static JEMALLOC: Jemalloc = Jemalloc; - +mod components; mod error; mod key_provider; @@ -105,12 +89,7 @@ impl Axon { } pub fn run(mut self, key_provider: Option) -> ProtocolResult<()> { - #[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" - ))] - Self::set_profile(true); + components::profiling::start(); let rt = RuntimeBuilder::new_multi_thread() .enable_all() @@ -256,22 +235,8 @@ impl Axon { // Start prometheus http server Self::run_prometheus_server(self.config.prometheus.clone()); - #[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" - ))] - tokio::spawn(common_memory_tracker::track_db_process( - "blockdb", - Arc::clone(&inner_db), - )); - - #[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" - ))] - tokio::spawn(common_memory_tracker::track_current_process()); + components::profiling::track_db_process("blockdb", &inner_db); + components::profiling::track_current_process(); log::info!("node starts"); @@ -407,6 +372,8 @@ impl Axon { Self::set_ctrl_c_handle().await; + components::profiling::stop(); + Ok(()) } @@ -776,16 +743,6 @@ impl Axon { _ = ctrl_c_handler => { log::info!("ctrl + c is pressed, quit.") }, _ = panic_receiver.recv() => { log::info!("child thread panic, quit.") }, }; - - #[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" - ))] - { - Self::set_profile(false); - Self::dump_profile(); - } } fn panic_log(info: &PanicInfo) { @@ -809,31 +766,6 @@ impl Axon { backtrace, ); } - - #[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" - ))] - fn set_profile(is_active: bool) { - let _ = b"prof.active\0" - .name() - .write(is_active) - .map_err(|e| panic!("Set jemalloc profile error {:?}", e)); - } - - #[cfg(all( - not(target_env = "msvc"), - not(target_os = "macos"), - feature = "jemalloc" - ))] - fn dump_profile() { - let name = b"profile.out\0".as_ref(); - b"prof.dump\0" - .name() - .write(name) - .expect("Should succeed to dump profile") - } } async fn init_storage>( From 3ee341f63f6059bbdb13e5d2060a18981315fbaa Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Sat, 26 Aug 2023 18:26:13 +0800 Subject: [PATCH 4/8] refactor(core-run): split system-related into a single file --- core/run/src/components/mod.rs | 2 + core/run/src/components/system.rs | 59 ++++++++++++++++++++++++++++++ core/run/src/lib.rs | 61 +------------------------------ 3 files changed, 63 insertions(+), 59 deletions(-) create mode 100644 core/run/src/components/system.rs diff --git a/core/run/src/components/mod.rs b/core/run/src/components/mod.rs index 652b382d4..f882c1bb3 100644 --- a/core/run/src/components/mod.rs +++ b/core/run/src/components/mod.rs @@ -1,3 +1,5 @@ +pub(crate) mod system; + #[cfg(all( not(target_env = "msvc"), not(target_os = "macos"), diff --git a/core/run/src/components/system.rs b/core/run/src/components/system.rs new file mode 100644 index 000000000..a7a190f1c --- /dev/null +++ b/core/run/src/components/system.rs @@ -0,0 +1,59 @@ +use std::panic::PanicInfo; + +use backtrace::Backtrace; + +use protocol::tokio; +#[cfg(unix)] +use protocol::tokio::signal::unix as os_impl; + +pub(crate) async fn set_ctrl_c_handle() { + let ctrl_c_handler = tokio::spawn(async { + #[cfg(windows)] + let _ = tokio::signal::ctrl_c().await; + #[cfg(unix)] + { + let mut sigtun_int = os_impl::signal(os_impl::SignalKind::interrupt()).unwrap(); + let mut sigtun_term = os_impl::signal(os_impl::SignalKind::terminate()).unwrap(); + tokio::select! { + _ = sigtun_int.recv() => {} + _ = sigtun_term.recv() => {} + }; + } + }); + + // register channel of panic + let (panic_sender, mut panic_receiver) = tokio::sync::mpsc::channel::<()>(1); + + std::panic::set_hook(Box::new(move |info: &PanicInfo| { + let panic_sender = panic_sender.clone(); + panic_log(info); + panic_sender.try_send(()).expect("panic_receiver is droped"); + })); + + tokio::select! { + _ = ctrl_c_handler => { log::info!("ctrl + c is pressed, quit.") }, + _ = panic_receiver.recv() => { log::info!("child thread panic, quit.") }, + }; +} + +fn panic_log(info: &PanicInfo) { + let backtrace = Backtrace::new(); + let thread = std::thread::current(); + let name = thread.name().unwrap_or("unnamed"); + let location = info.location().unwrap(); // The current implementation always returns Some + let msg = match info.payload().downcast_ref::<&'static str>() { + Some(s) => *s, + None => match info.payload().downcast_ref::() { + Some(s) => &**s, + None => "Box", + }, + }; + log::error!( + target: "panic", "thread '{}' panicked at '{}': {}:{} {:?}", + name, + msg, + location.file(), + location.line(), + backtrace, + ); +} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index a2cddb8ae..8501e8ebf 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -1,8 +1,6 @@ #![allow(clippy::uninlined_format_args, clippy::mutable_key_type)] -use std::{collections::HashMap, panic::PanicInfo, path::Path, sync::Arc, time::Duration}; - -use backtrace::Backtrace; +use std::{collections::HashMap, path::Path, sync::Arc, time::Duration}; use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; use common_apm::{server::run_prometheus_server, tracing::global_tracer_register}; @@ -11,8 +9,6 @@ use common_config_parser::types::{Config, ConfigJaeger, ConfigPrometheus, Config use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey}; use protocol::codec::{hex_decode, ProtocolCodec}; -#[cfg(unix)] -use protocol::tokio::signal::unix as os_impl; use protocol::tokio::{ self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep, }; @@ -370,8 +366,7 @@ impl Axon { // Run consensus Self::run_overlord_consensus(metadata, validators, current_block, overlord_consensus); - Self::set_ctrl_c_handle().await; - + components::system::set_ctrl_c_handle().await; components::profiling::stop(); Ok(()) @@ -714,58 +709,6 @@ impl Axon { } }); } - - async fn set_ctrl_c_handle() { - let ctrl_c_handler = tokio::spawn(async { - #[cfg(windows)] - let _ = tokio::signal::ctrl_c().await; - #[cfg(unix)] - { - let mut sigtun_int = os_impl::signal(os_impl::SignalKind::interrupt()).unwrap(); - let mut sigtun_term = os_impl::signal(os_impl::SignalKind::terminate()).unwrap(); - tokio::select! { - _ = sigtun_int.recv() => {} - _ = sigtun_term.recv() => {} - }; - } - }); - - // register channel of panic - let (panic_sender, mut panic_receiver) = tokio::sync::mpsc::channel::<()>(1); - - std::panic::set_hook(Box::new(move |info: &PanicInfo| { - let panic_sender = panic_sender.clone(); - Self::panic_log(info); - panic_sender.try_send(()).expect("panic_receiver is droped"); - })); - - tokio::select! { - _ = ctrl_c_handler => { log::info!("ctrl + c is pressed, quit.") }, - _ = panic_receiver.recv() => { log::info!("child thread panic, quit.") }, - }; - } - - fn panic_log(info: &PanicInfo) { - let backtrace = Backtrace::new(); - let thread = std::thread::current(); - let name = thread.name().unwrap_or("unnamed"); - let location = info.location().unwrap(); // The current implementation always returns Some - let msg = match info.payload().downcast_ref::<&'static str>() { - Some(s) => *s, - None => match info.payload().downcast_ref::() { - Some(s) => &**s, - None => "Box", - }, - }; - log::error!( - target: "panic", "thread '{}' panicked at '{}': {}:{} {:?}", - name, - msg, - location.file(), - location.line(), - backtrace, - ); - } } async fn init_storage>( From c42862a3d887009e7e4cbe5a844bcfd47552a121 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Sat, 26 Aug 2023 18:30:54 +0800 Subject: [PATCH 5/8] refactor(core-run): split optional extensions into a single file --- core/run/src/components/extensions.rs | 68 +++++++++++++++++++++++++++ core/run/src/components/mod.rs | 1 + core/run/src/lib.rs | 36 ++------------ 3 files changed, 73 insertions(+), 32 deletions(-) create mode 100644 core/run/src/components/extensions.rs diff --git a/core/run/src/components/extensions.rs b/core/run/src/components/extensions.rs new file mode 100644 index 000000000..352ebb54f --- /dev/null +++ b/core/run/src/components/extensions.rs @@ -0,0 +1,68 @@ +//! Optional extensions. + +use common_apm::{server::run_prometheus_server, tracing::global_tracer_register}; +use common_config_parser::types::{ConfigJaeger, ConfigPrometheus}; +use protocol::{tokio, ProtocolResult}; + +pub(crate) trait ExtensionConfig { + const NAME: &'static str; + + /// Try to start and return the result. + fn try_to_start(&self) -> ProtocolResult; + + /// Try to start and ignore the result. + fn start_if_possible(&self) { + match self.try_to_start() { + Ok(started) => { + if started { + log::info!("{} is started", Self::NAME); + } else { + log::info!("{} is disabled", Self::NAME); + } + } + Err(err) => { + log::error!("failed to start {} since {err}", Self::NAME); + } + } + } +} + +impl ExtensionConfig for Option { + const NAME: &'static str = "Jaeger"; + + fn try_to_start(&self) -> ProtocolResult { + if let Some(ref config) = self { + if let Some(ref addr) = config.tracing_address { + let service_name = config + .service_name + .as_ref() + .map(ToOwned::to_owned) + .unwrap_or("axon".to_owned()); + let tracing_batch_size = config.tracing_batch_size.unwrap_or(50); + global_tracer_register(&service_name, addr.to_owned(), tracing_batch_size); + Ok(true) + } else { + Ok(false) + } + } else { + Ok(false) + } + } +} + +impl ExtensionConfig for Option { + const NAME: &'static str = "Prometheus"; + + fn try_to_start(&self) -> ProtocolResult { + if let Some(ref config) = self { + if let Some(ref addr) = config.listening_address { + tokio::spawn(run_prometheus_server(addr.to_owned())); + Ok(true) + } else { + Ok(false) + } + } else { + Ok(false) + } + } +} diff --git a/core/run/src/components/mod.rs b/core/run/src/components/mod.rs index f882c1bb3..4a559c0ea 100644 --- a/core/run/src/components/mod.rs +++ b/core/run/src/components/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod extensions; pub(crate) mod system; #[cfg(all( diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index 8501e8ebf..54b58e75e 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -3,9 +3,8 @@ use std::{collections::HashMap, path::Path, sync::Arc, time::Duration}; use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; -use common_apm::{server::run_prometheus_server, tracing::global_tracer_register}; use common_config_parser::types::spec::{ChainSpec, InitialAccount}; -use common_config_parser::types::{Config, ConfigJaeger, ConfigPrometheus, ConfigRocksDB}; +use common_config_parser::types::{Config, ConfigRocksDB}; use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey}; use protocol::codec::{hex_decode, ProtocolCodec}; @@ -61,6 +60,7 @@ mod key_provider; #[cfg(test)] mod tests; +use components::extensions::ExtensionConfig as _; pub use error::MainError; use key_provider::KeyP; @@ -226,10 +226,10 @@ impl Axon { inner_db: Arc, ) -> ProtocolResult<()> { // Start jaeger - Self::run_jaeger(self.config.jaeger.clone()); + self.config.jaeger.start_if_possible(); // Start prometheus http server - Self::run_prometheus_server(self.config.prometheus.clone()); + self.config.prometheus.start_if_possible(); components::profiling::track_db_process("blockdb", &inner_db); components::profiling::track_current_process(); @@ -649,34 +649,6 @@ impl Axon { .unwrap(); } - fn run_jaeger(config: Option) { - if let Some(jaeger_config) = config { - let service_name = match jaeger_config.service_name { - Some(name) => name, - None => "axon".to_string(), - }; - - let tracing_address = match jaeger_config.tracing_address { - Some(address) => address, - None => std::net::SocketAddr::from(([0, 0, 0, 0], 6831)), - }; - - let tracing_batch_size = jaeger_config.tracing_batch_size.unwrap_or(50); - - global_tracer_register(&service_name, tracing_address, tracing_batch_size); - log::info!("jaeger started"); - }; - } - - fn run_prometheus_server(config: Option) { - if let Some(prometheus_config) = config { - if let Some(prometheus_listening_address) = prometheus_config.listening_address { - tokio::spawn(run_prometheus_server(prometheus_listening_address)); - log::info!("prometheus started"); - } - }; - } - fn run_overlord_consensus( metadata: Metadata, validators: Vec, From e4b2e889d3fa1de43ecb695ce4fab58fdb621525 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Sat, 26 Aug 2023 18:36:04 +0800 Subject: [PATCH 6/8] refactor(core-run): split network into a single file --- core/run/src/components/mod.rs | 1 + core/run/src/components/network.rs | 147 ++++++++++++++++++++++++++ core/run/src/lib.rs | 161 ++--------------------------- 3 files changed, 158 insertions(+), 151 deletions(-) create mode 100644 core/run/src/components/network.rs diff --git a/core/run/src/components/mod.rs b/core/run/src/components/mod.rs index 4a559c0ea..4b4de63f0 100644 --- a/core/run/src/components/mod.rs +++ b/core/run/src/components/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod extensions; +pub(crate) mod network; pub(crate) mod system; #[cfg(all( diff --git a/core/run/src/components/network.rs b/core/run/src/components/network.rs new file mode 100644 index 000000000..352a1103d --- /dev/null +++ b/core/run/src/components/network.rs @@ -0,0 +1,147 @@ +//! Configure the network service. + +use std::sync::Arc; + +use core_consensus::message::{ + ChokeMessageHandler, ProposalMessageHandler, PullBlockRpcHandler, PullProofRpcHandler, + PullTxsRpcHandler, QCMessageHandler, RemoteHeightMessageHandler, VoteMessageHandler, + BROADCAST_HEIGHT, END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_CHOKE, + END_GOSSIP_SIGNED_PROPOSAL, END_GOSSIP_SIGNED_VOTE, RPC_RESP_SYNC_PULL_BLOCK, + RPC_RESP_SYNC_PULL_PROOF, RPC_RESP_SYNC_PULL_TXS, RPC_SYNC_PULL_BLOCK, RPC_SYNC_PULL_PROOF, + RPC_SYNC_PULL_TXS, +}; +use core_consensus::OverlordSynchronization; +use core_db::RocksAdapter; +use core_mempool::{ + NewTxsHandler, PullTxsHandler, END_GOSSIP_NEW_TXS, RPC_PULL_TXS, RPC_RESP_PULL_TXS, + RPC_RESP_PULL_TXS_SYNC, +}; +use core_network::{KeyProvider, NetworkService, PeerId, PeerIdExt}; +use core_storage::ImplStorage; +use protocol::{ + traits::{Consensus, Context, MemPool, Network, SynchronizationAdapter}, + types::ValidatorExtend, + ProtocolResult, +}; + +pub(crate) trait NetworkServiceExt { + fn tag_consensus(&self, validators: &[ValidatorExtend]) -> ProtocolResult<()>; + + fn register_mempool_endpoint( + &mut self, + mempool: &Arc, + ) -> ProtocolResult<()>; + + fn register_consensus_endpoint( + &mut self, + overlord_consensus: &Arc, + ) -> ProtocolResult<()>; + + fn register_synchronization_endpoint( + &mut self, + synchronization: &Arc>, + ) -> ProtocolResult<()>; + + fn register_storage_endpoint( + &mut self, + storage: &Arc>, + ) -> ProtocolResult<()>; + + fn register_rpc(&mut self) -> ProtocolResult<()>; +} + +impl NetworkServiceExt for NetworkService +where + K: KeyProvider, +{ + fn tag_consensus(&self, validators: &[ValidatorExtend]) -> ProtocolResult<()> { + let peer_ids = validators + .iter() + .map(|v| PeerId::from_pubkey_bytes(v.pub_key.as_bytes()).map(PeerIdExt::into_bytes_ext)) + .collect::, _>>() + .unwrap(); + self.handle().tag_consensus(Context::new(), peer_ids) + } + + fn register_mempool_endpoint( + &mut self, + mempool: &Arc, + ) -> ProtocolResult<()> { + // register broadcast new transaction + self.register_endpoint_handler( + END_GOSSIP_NEW_TXS, + NewTxsHandler::new(Arc::clone(mempool)), + )?; + // register pull txs from other node + self.register_endpoint_handler( + RPC_PULL_TXS, + PullTxsHandler::new(Arc::new(self.handle()), Arc::clone(mempool)), + )?; + Ok(()) + } + + fn register_consensus_endpoint( + &mut self, + overlord_consensus: &Arc, + ) -> ProtocolResult<()> { + // register consensus + self.register_endpoint_handler( + END_GOSSIP_SIGNED_PROPOSAL, + ProposalMessageHandler::new(Arc::clone(overlord_consensus)), + )?; + self.register_endpoint_handler( + END_GOSSIP_AGGREGATED_VOTE, + QCMessageHandler::new(Arc::clone(overlord_consensus)), + )?; + self.register_endpoint_handler( + END_GOSSIP_SIGNED_VOTE, + VoteMessageHandler::new(Arc::clone(overlord_consensus)), + )?; + self.register_endpoint_handler( + END_GOSSIP_SIGNED_CHOKE, + ChokeMessageHandler::new(Arc::clone(overlord_consensus)), + )?; + Ok(()) + } + + fn register_synchronization_endpoint( + &mut self, + synchronization: &Arc>, + ) -> ProtocolResult<()> { + self.register_endpoint_handler( + BROADCAST_HEIGHT, + RemoteHeightMessageHandler::new(Arc::clone(synchronization)), + )?; + Ok(()) + } + + fn register_storage_endpoint( + &mut self, + storage: &Arc>, + ) -> ProtocolResult<()> { + let handle = Arc::new(self.handle()); + // register storage + self.register_endpoint_handler( + RPC_SYNC_PULL_BLOCK, + PullBlockRpcHandler::new(Arc::clone(&handle), Arc::clone(storage)), + )?; + self.register_endpoint_handler( + RPC_SYNC_PULL_PROOF, + PullProofRpcHandler::new(Arc::clone(&handle), Arc::clone(storage)), + )?; + self.register_endpoint_handler( + RPC_SYNC_PULL_TXS, + PullTxsRpcHandler::new(Arc::clone(&handle), Arc::clone(storage)), + )?; + Ok(()) + } + + fn register_rpc(&mut self) -> ProtocolResult<()> { + self.register_rpc_response(RPC_RESP_PULL_TXS)?; + self.register_rpc_response(RPC_RESP_PULL_TXS_SYNC)?; + self.register_rpc_response(RPC_RESP_SYNC_PULL_BLOCK)?; + self.register_rpc_response(RPC_RESP_SYNC_PULL_PROOF)?; + self.register_rpc_response(RPC_RESP_SYNC_PULL_TXS)?; + Ok(()) + } +} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index 54b58e75e..4adc9b7fb 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -12,8 +12,7 @@ use protocol::tokio::{ self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep, }; use protocol::traits::{ - Consensus, Context, Executor, Gossip, MemPool, Network, NodeInfo, PeerTrust, ReadOnlyStorage, - Rpc, Storage, SynchronizationAdapter, + Context, Executor, Gossip, MemPool, Network, NodeInfo, PeerTrust, ReadOnlyStorage, Rpc, Storage, }; use protocol::trie::Trie; use protocol::types::{ @@ -23,14 +22,6 @@ use protocol::types::{ use protocol::{lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; use core_api::{jsonrpc::run_jsonrpc_server, DefaultAPIAdapter}; -use core_consensus::message::{ - ChokeMessageHandler, ProposalMessageHandler, PullBlockRpcHandler, PullProofRpcHandler, - PullTxsRpcHandler, QCMessageHandler, RemoteHeightMessageHandler, VoteMessageHandler, - BROADCAST_HEIGHT, END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_CHOKE, - END_GOSSIP_SIGNED_PROPOSAL, END_GOSSIP_SIGNED_VOTE, RPC_RESP_SYNC_PULL_BLOCK, - RPC_RESP_SYNC_PULL_PROOF, RPC_RESP_SYNC_PULL_TXS, RPC_SYNC_PULL_BLOCK, RPC_SYNC_PULL_PROOF, - RPC_SYNC_PULL_TXS, -}; use core_consensus::status::{CurrentStatus, StatusAgent}; use core_consensus::{ util::OverlordCrypto, ConsensusWal, DurationConfig, OverlordConsensus, @@ -42,13 +33,8 @@ use core_executor::{ AxonExecutor, AxonExecutorApplyAdapter, AxonExecutorReadOnlyAdapter, MPTTrie, RocksTrieDB, }; use core_interoperation::InteroperationImpl; -use core_mempool::{ - DefaultMemPoolAdapter, MemPoolImpl, NewTxsHandler, PullTxsHandler, END_GOSSIP_NEW_TXS, - RPC_PULL_TXS, RPC_RESP_PULL_TXS, RPC_RESP_PULL_TXS_SYNC, -}; -use core_network::{ - observe_listen_port_occupancy, NetworkConfig, NetworkService, PeerId, PeerIdExt, -}; +use core_mempool::{DefaultMemPoolAdapter, MemPoolImpl}; +use core_network::{observe_listen_port_occupancy, NetworkConfig, NetworkService}; use core_storage::ImplStorage; pub use core_network::{KeyProvider, SecioKeyPair}; @@ -60,7 +46,7 @@ mod key_provider; #[cfg(test)] mod tests; -use components::extensions::ExtensionConfig as _; +use components::{extensions::ExtensionConfig as _, network::NetworkServiceExt as _}; pub use error::MainError; use key_provider::KeyP; @@ -333,14 +319,14 @@ impl Axon { lock, )); - self.tag_consensus(&network_service, &metadata.verifier_list); + network_service.tag_consensus(&metadata.verifier_list)?; // register endpoints to network service - self.register_mempool_endpoint(&mut network_service, &mempool); - self.register_consensus_endpoint(&mut network_service, &overlord_consensus); - self.register_synchronization_endpoint(&mut network_service, &synchronization); - self.register_storage_endpoint(&mut network_service, &storage); - self.register_rpc(&mut network_service); + network_service.register_mempool_endpoint(&mempool)?; + network_service.register_consensus_endpoint(&overlord_consensus)?; + network_service.register_synchronization_endpoint(&synchronization)?; + network_service.register_storage_endpoint(&storage)?; + network_service.register_rpc()?; let network_handle = network_service.handle(); @@ -522,133 +508,6 @@ impl Axon { ) } - fn tag_consensus( - &self, - network_service: &NetworkService, - validators: &[ValidatorExtend], - ) { - let peer_ids = validators - .iter() - .map(|v| PeerId::from_pubkey_bytes(v.pub_key.as_bytes()).map(PeerIdExt::into_bytes_ext)) - .collect::, _>>() - .unwrap(); - - network_service - .handle() - .tag_consensus(Context::new(), peer_ids) - .unwrap(); - } - - fn register_mempool_endpoint( - &self, - network_service: &mut NetworkService, - mempool: &Arc, - ) { - // register broadcast new transaction - network_service - .register_endpoint_handler(END_GOSSIP_NEW_TXS, NewTxsHandler::new(Arc::clone(mempool))) - .unwrap(); - - // register pull txs from other node - network_service - .register_endpoint_handler( - RPC_PULL_TXS, - PullTxsHandler::new(Arc::new(network_service.handle()), Arc::clone(mempool)), - ) - .unwrap(); - } - - fn register_consensus_endpoint( - &self, - network_service: &mut NetworkService, - overlord_consensus: &Arc, - ) { - // register consensus - network_service - .register_endpoint_handler( - END_GOSSIP_SIGNED_PROPOSAL, - ProposalMessageHandler::new(Arc::clone(overlord_consensus)), - ) - .unwrap(); - network_service - .register_endpoint_handler( - END_GOSSIP_AGGREGATED_VOTE, - QCMessageHandler::new(Arc::clone(overlord_consensus)), - ) - .unwrap(); - network_service - .register_endpoint_handler( - END_GOSSIP_SIGNED_VOTE, - VoteMessageHandler::new(Arc::clone(overlord_consensus)), - ) - .unwrap(); - network_service - .register_endpoint_handler( - END_GOSSIP_SIGNED_CHOKE, - ChokeMessageHandler::new(Arc::clone(overlord_consensus)), - ) - .unwrap(); - } - - fn register_synchronization_endpoint( - &self, - network_service: &mut NetworkService, - synchronization: &Arc>, - ) { - network_service - .register_endpoint_handler( - BROADCAST_HEIGHT, - RemoteHeightMessageHandler::new(Arc::clone(synchronization)), - ) - .unwrap(); - } - - fn register_storage_endpoint( - &self, - network_service: &mut NetworkService, - storage: &Arc>, - ) { - // register storage - network_service - .register_endpoint_handler( - RPC_SYNC_PULL_BLOCK, - PullBlockRpcHandler::new(Arc::new(network_service.handle()), Arc::clone(storage)), - ) - .unwrap(); - - network_service - .register_endpoint_handler( - RPC_SYNC_PULL_PROOF, - PullProofRpcHandler::new(Arc::new(network_service.handle()), Arc::clone(storage)), - ) - .unwrap(); - - network_service - .register_endpoint_handler( - RPC_SYNC_PULL_TXS, - PullTxsRpcHandler::new(Arc::new(network_service.handle()), Arc::clone(storage)), - ) - .unwrap(); - } - - fn register_rpc(&self, network_service: &mut NetworkService) { - network_service - .register_rpc_response(RPC_RESP_PULL_TXS) - .unwrap(); - network_service - .register_rpc_response(RPC_RESP_PULL_TXS_SYNC) - .unwrap(); - network_service - .register_rpc_response(RPC_RESP_SYNC_PULL_BLOCK) - .unwrap(); - network_service - .register_rpc_response(RPC_RESP_SYNC_PULL_PROOF) - .unwrap(); - network_service - .register_rpc_response(RPC_RESP_SYNC_PULL_TXS) - .unwrap(); - } - fn run_overlord_consensus( metadata: Metadata, validators: Vec, From 02734473616ab864b84036fb90c7bf812b9bcea2 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Sun, 27 Aug 2023 14:32:17 +0800 Subject: [PATCH 7/8] refactor(core-run): split storage into a single file --- Cargo.lock | 1 + core/run/src/components/mod.rs | 1 + core/run/src/components/storage.rs | 80 +++++++++++++++++++++++ core/run/src/lib.rs | 101 +++++------------------------ core/run/src/tests.rs | 1 - protocol/Cargo.toml | 1 + protocol/src/types/executor.rs | 1 + protocol/src/types/mod.rs | 2 +- 8 files changed, 102 insertions(+), 86 deletions(-) create mode 100644 core/run/src/components/storage.rs diff --git a/Cargo.lock b/Cargo.lock index 9427564b6..2175bb6f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,6 +329,7 @@ dependencies = [ "ethers-core", "evm", "faster-hex 0.8.0", + "hasher", "hex", "lazy_static", "ophelia", diff --git a/core/run/src/components/mod.rs b/core/run/src/components/mod.rs index 4b4de63f0..997462230 100644 --- a/core/run/src/components/mod.rs +++ b/core/run/src/components/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod extensions; pub(crate) mod network; +pub(crate) mod storage; pub(crate) mod system; #[cfg(all( diff --git a/core/run/src/components/storage.rs b/core/run/src/components/storage.rs new file mode 100644 index 000000000..2f488ee54 --- /dev/null +++ b/core/run/src/components/storage.rs @@ -0,0 +1,80 @@ +use std::{path::Path, sync::Arc}; + +use common_config_parser::types::{spec::InitialAccount, ConfigRocksDB}; +use core_db::{RocksAdapter, RocksDB}; +use core_executor::{MPTTrie, RocksTrieDB}; +use core_storage::ImplStorage; +use protocol::{ + async_trait, + codec::ProtocolCodec, + traits::{Context, Storage}, + trie::{self, Trie}, + types::{Account, Block, ExecResp, HasherKeccak, RichBlock, NIL_DATA, RLP_NULL}, + ProtocolResult, +}; + +pub(crate) fn init_storage>( + config: &ConfigRocksDB, + rocksdb_path: P, + triedb_cache_size: usize, +) -> ProtocolResult<( + Arc>, + Arc, + Arc, +)> { + let adapter = Arc::new(RocksAdapter::new(rocksdb_path, config.clone())?); + let inner_db = adapter.inner_db(); + let trie_db = Arc::new(RocksTrieDB::new_evm(adapter.inner_db(), triedb_cache_size)); + let storage = Arc::new(ImplStorage::new(adapter, config.cache_size)); + Ok((storage, trie_db, inner_db)) +} + +#[async_trait] +pub(crate) trait StorageExt: Storage { + async fn try_load_genesis(&self) -> ProtocolResult> { + self.get_block(Context::new(), 0).await.or_else(|e| { + if e.to_string().contains("GetNone") { + Ok(None) + } else { + Err(e) + } + }) + } + + async fn save_block(&self, rich: &RichBlock, resp: &ExecResp) -> ProtocolResult<()> { + self.update_latest_proof(Context::new(), rich.block.header.proof.clone()) + .await?; + self.insert_block(Context::new(), rich.block.clone()) + .await?; + self.insert_transactions(Context::new(), rich.block.header.number, rich.txs.clone()) + .await?; + let (receipts, _logs) = rich.generate_receipts_and_logs(resp); + self.insert_receipts(Context::new(), rich.block.header.number, receipts) + .await?; + Ok(()) + } +} + +impl StorageExt for ImplStorage {} + +pub(crate) trait TrieExt: Trie + Sized +where + D: trie::DB, + H: trie::Hasher, +{ + fn insert_accounts(mut self, accounts: &[InitialAccount]) -> ProtocolResult { + for account in accounts { + let raw_account = Account { + nonce: 0u64.into(), + balance: account.balance, + storage_root: RLP_NULL, + code_hash: NIL_DATA, + } + .encode()?; + self.insert(account.address.as_bytes().to_vec(), raw_account.to_vec())?; + } + Ok(self) + } +} + +impl TrieExt for MPTTrie where D: trie::DB {} diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index 4adc9b7fb..eeec8f615 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -1,23 +1,22 @@ #![allow(clippy::uninlined_format_args, clippy::mutable_key_type)] -use std::{collections::HashMap, path::Path, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; use common_config_parser::types::spec::{ChainSpec, InitialAccount}; -use common_config_parser::types::{Config, ConfigRocksDB}; +use common_config_parser::types::Config; use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey}; -use protocol::codec::{hex_decode, ProtocolCodec}; +use protocol::codec::hex_decode; use protocol::tokio::{ self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep, }; use protocol::traits::{ Context, Executor, Gossip, MemPool, Network, NodeInfo, PeerTrust, ReadOnlyStorage, Rpc, Storage, }; -use protocol::trie::Trie; use protocol::types::{ - Account, Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, - Validator, ValidatorExtend, H256, NIL_DATA, RLP_NULL, + Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, Validator, + ValidatorExtend, H256, }; use protocol::{lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; @@ -46,7 +45,11 @@ mod key_provider; #[cfg(test)] mod tests; -use components::{extensions::ExtensionConfig as _, network::NetworkServiceExt as _}; +use components::{ + extensions::ExtensionConfig as _, + network::NetworkServiceExt as _, + storage::{init_storage, StorageExt as _, TrieExt as _}, +}; pub use error::MainError; use key_provider::KeyP; @@ -83,9 +86,8 @@ impl Axon { &self.config.rocksdb, self.config.data_path_for_rocksdb(), self.config.executor.triedb_cache_size, - ) - .await?; - if let Some(genesis) = self.try_load_genesis(&storage).await? { + )?; + if let Some(genesis) = storage.try_load_genesis().await? { log::info!("The Genesis block has been initialized."); self.apply_genesis_after_checks(&genesis).await?; } else { @@ -99,19 +101,6 @@ impl Axon { Ok(()) } - async fn try_load_genesis( - &self, - storage: &Arc>, - ) -> ProtocolResult> { - storage.get_block(Context::new(), 0).await.or_else(|e| { - if e.to_string().contains("GetNone") { - Ok(None) - } else { - Err(e) - } - }) - } - async fn create_genesis( &mut self, storage: &Arc>, @@ -137,7 +126,7 @@ impl Axon { log::info!("The genesis block is created {:?}", self.genesis.block); - save_block(storage, &self.genesis, &resp).await?; + storage.save_block(&self.genesis, &resp).await?; Ok(()) } @@ -153,8 +142,7 @@ impl Axon { &self.config.rocksdb, path_block, self.config.executor.triedb_cache_size, - ) - .await?; + )?; let resp = execute_transactions( &self.genesis, @@ -542,39 +530,6 @@ impl Axon { } } -async fn init_storage>( - config: &ConfigRocksDB, - rocksdb_path: P, - triedb_cache_size: usize, -) -> ProtocolResult<( - Arc>, - Arc, - Arc, -)> { - let adapter = Arc::new(RocksAdapter::new(rocksdb_path, config.clone())?); - let inner_db = adapter.inner_db(); - let trie_db = Arc::new(RocksTrieDB::new_evm(adapter.inner_db(), triedb_cache_size)); - let storage = Arc::new(ImplStorage::new(adapter, config.cache_size)); - Ok((storage, trie_db, inner_db)) -} - -fn insert_accounts(mpt: &mut MPTTrie, accounts: &[InitialAccount]) -> ProtocolResult<()> -where - DB: TrieDB, -{ - for account in accounts { - let raw_account = Account { - nonce: 0u64.into(), - balance: account.balance, - storage_root: RLP_NULL, - code_hash: NIL_DATA, - } - .encode()?; - mpt.insert(account.address.as_bytes().to_vec(), raw_account.to_vec())?; - } - Ok(()) -} - fn execute_transactions( rich: &RichBlock, storage: &Arc, @@ -585,11 +540,9 @@ fn execute_transactions( where S: Storage + 'static, { - let state_root = { - let mut mpt = MPTTrie::new(Arc::clone(trie_db)); - insert_accounts(&mut mpt, accounts).expect("insert accounts"); - mpt.commit()? - }; + let state_root = MPTTrie::new(Arc::clone(&trie_db)) + .insert_accounts(accounts)? + .commit()?; let executor = AxonExecutor::default(); let mut backend = AxonExecutorApplyAdapter::from_root( state_root, @@ -613,23 +566,3 @@ where Ok(resp) } - -async fn save_block(storage: &Arc, rich: &RichBlock, resp: &ExecResp) -> ProtocolResult<()> -where - S: Storage + 'static, -{ - storage - .update_latest_proof(Context::new(), rich.block.header.proof.clone()) - .await?; - storage - .insert_block(Context::new(), rich.block.clone()) - .await?; - storage - .insert_transactions(Context::new(), rich.block.header.number, rich.txs.clone()) - .await?; - let (receipts, _logs) = rich.generate_receipts_and_logs(resp); - storage - .insert_receipts(Context::new(), rich.block.header.number, receipts) - .await?; - Ok(()) -} diff --git a/core/run/src/tests.rs b/core/run/src/tests.rs index afb772dad..7ea53ca32 100644 --- a/core/run/src/tests.rs +++ b/core/run/src/tests.rs @@ -134,7 +134,6 @@ async fn check_genesis_data<'a>(case: &TestCase<'a>) { path_block, config.executor.triedb_cache_size, ) - .await .expect("initialize storage"); let resp = execute_transactions( diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index a145f8310..7e4b6b444 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -21,6 +21,7 @@ ethereum-types = { version = "0.14", features = ["arbitrary", "codec", "rlp", "s ethers-core = "2.0" evm = { version = "0.37", features = ["with-serde"] } faster-hex = "0.8" +hasher = "0.1" lazy_static = "1.4" ophelia = "0.3" overlord = "0.4" diff --git a/protocol/src/types/executor.rs b/protocol/src/types/executor.rs index ca9ac4fa6..26be301ca 100644 --- a/protocol/src/types/executor.rs +++ b/protocol/src/types/executor.rs @@ -1,5 +1,6 @@ pub use ethereum::{AccessList, AccessListItem, Account}; pub use evm::{backend::Log, Config, ExitError, ExitFatal, ExitReason, ExitRevert, ExitSucceed}; +pub use hasher::HasherKeccak; use rlp_derive::{RlpDecodable, RlpEncodable}; diff --git a/protocol/src/types/mod.rs b/protocol/src/types/mod.rs index 490a0c25c..e8f40c7ba 100644 --- a/protocol/src/types/mod.rs +++ b/protocol/src/types/mod.rs @@ -7,7 +7,7 @@ pub use ckb_client::*; pub use evm::{backend::*, ExitError, ExitRevert, ExitSucceed}; pub use executor::{ logs_bloom, AccessList, AccessListItem, Account, Config, ExecResp, ExecutorContext, ExitReason, - TxResp, + HasherKeccak, TxResp, }; pub use interoperation::*; pub use primitive::*; From 68e2b0e6a2eb9a04a9cc65fdd62401b602a3a0eb Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Tue, 29 Aug 2023 21:36:42 +0800 Subject: [PATCH 8/8] refactor(core-run): only required parameters in function signatures --- core/run/src/components/storage.rs | 48 +- core/run/src/lib.rs | 818 ++++++++++++++--------------- core/run/src/tests.rs | 18 +- 3 files changed, 420 insertions(+), 464 deletions(-) diff --git a/core/run/src/components/storage.rs b/core/run/src/components/storage.rs index 2f488ee54..845a88190 100644 --- a/core/run/src/components/storage.rs +++ b/core/run/src/components/storage.rs @@ -13,20 +13,40 @@ use protocol::{ ProtocolResult, }; -pub(crate) fn init_storage>( - config: &ConfigRocksDB, - rocksdb_path: P, - triedb_cache_size: usize, -) -> ProtocolResult<( - Arc>, - Arc, - Arc, -)> { - let adapter = Arc::new(RocksAdapter::new(rocksdb_path, config.clone())?); - let inner_db = adapter.inner_db(); - let trie_db = Arc::new(RocksTrieDB::new_evm(adapter.inner_db(), triedb_cache_size)); - let storage = Arc::new(ImplStorage::new(adapter, config.cache_size)); - Ok((storage, trie_db, inner_db)) +pub(crate) struct DatabaseGroup { + storage: Arc>, + trie_db: Arc, + inner_db: Arc, +} + +impl DatabaseGroup { + pub(crate) fn new>( + config: &ConfigRocksDB, + rocksdb_path: P, + triedb_cache_size: usize, + ) -> ProtocolResult { + let adapter = Arc::new(RocksAdapter::new(rocksdb_path, config.clone())?); + let inner_db = adapter.inner_db(); + let trie_db = Arc::new(RocksTrieDB::new_evm(adapter.inner_db(), triedb_cache_size)); + let storage = Arc::new(ImplStorage::new(adapter, config.cache_size)); + Ok(Self { + storage, + trie_db, + inner_db, + }) + } + + pub(crate) fn storage(&self) -> Arc> { + Arc::clone(&self.storage) + } + + pub(crate) fn trie_db(&self) -> Arc { + Arc::clone(&self.trie_db) + } + + pub(crate) fn inner_db(&self) -> Arc { + Arc::clone(&self.inner_db) + } } #[async_trait] diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index eeec8f615..09c308547 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -4,10 +4,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; use common_config_parser::types::spec::{ChainSpec, InitialAccount}; -use common_config_parser::types::Config; +use common_config_parser::types::{Config, ConfigMempool, ConfigRocksDB}; use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey}; -use protocol::codec::hex_decode; use protocol::tokio::{ self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep, }; @@ -15,8 +14,8 @@ use protocol::traits::{ Context, Executor, Gossip, MemPool, Network, NodeInfo, PeerTrust, ReadOnlyStorage, Rpc, Storage, }; use protocol::types::{ - Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, Validator, - ValidatorExtend, H256, + Block, ExecResp, Header, Metadata, Proposal, RichBlock, SignedTransaction, Validator, + ValidatorExtend, }; use protocol::{lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; @@ -26,15 +25,11 @@ use core_consensus::{ util::OverlordCrypto, ConsensusWal, DurationConfig, OverlordConsensus, OverlordConsensusAdapter, OverlordSynchronization, SignedTxsWAL, }; -use core_db::{RocksAdapter, RocksDB}; use core_executor::system_contract::{self, metadata::MetadataHandle}; -use core_executor::{ - AxonExecutor, AxonExecutorApplyAdapter, AxonExecutorReadOnlyAdapter, MPTTrie, RocksTrieDB, -}; +use core_executor::{AxonExecutor, AxonExecutorApplyAdapter, AxonExecutorReadOnlyAdapter, MPTTrie}; use core_interoperation::InteroperationImpl; use core_mempool::{DefaultMemPoolAdapter, MemPoolImpl}; use core_network::{observe_listen_port_occupancy, NetworkConfig, NetworkService}; -use core_storage::ImplStorage; pub use core_network::{KeyProvider, SecioKeyPair}; @@ -48,18 +43,17 @@ mod tests; use components::{ extensions::ExtensionConfig as _, network::NetworkServiceExt as _, - storage::{init_storage, StorageExt as _, TrieExt as _}, + storage::{DatabaseGroup, StorageExt as _, TrieExt as _}, }; pub use error::MainError; use key_provider::KeyP; #[derive(Debug)] pub struct Axon { - version: String, - config: Config, - spec: ChainSpec, - genesis: RichBlock, - state_root: MerkleRoot, + version: String, + config: Config, + spec: ChainSpec, + genesis: RichBlock, } impl Axon { @@ -69,12 +63,16 @@ impl Axon { config, spec, genesis, - state_root: MerkleRoot::default(), } } - pub fn run(mut self, key_provider: Option) -> ProtocolResult<()> { - components::profiling::start(); + pub fn run(self, key_provider: Option) -> ProtocolResult<()> { + let Self { + version, + config, + spec, + genesis, + } = self; let rt = RuntimeBuilder::new_multi_thread() .enable_all() @@ -82,476 +80,420 @@ impl Axon { .expect("new tokio runtime"); rt.block_on(async move { - let (storage, trie_db, inner_db) = init_storage( - &self.config.rocksdb, - self.config.data_path_for_rocksdb(), - self.config.executor.triedb_cache_size, + let db_group = DatabaseGroup::new( + &config.rocksdb, + config.data_path_for_rocksdb(), + config.executor.triedb_cache_size, )?; - if let Some(genesis) = storage.try_load_genesis().await? { - log::info!("The Genesis block has been initialized."); - self.apply_genesis_after_checks(&genesis).await?; + if let Some(loaded_genesis) = db_group.storage().try_load_genesis().await? { + log::info!("Check genesis block."); + let genesis = execute_genesis_temporarily( + genesis, + spec, + &config.rocksdb, + config.executor.triedb_cache_size, + ) + .await?; + if genesis.block != loaded_genesis { + let err_msg = format!( + "The user provided genesis (hash: {:#x}) is NOT \ + the same as the genesis in storage (hash: {:#x})", + genesis.block.hash(), + loaded_genesis.hash() + ); + return Err(MainError::Other(err_msg).into()); + } } else { - self.create_genesis(&storage, &trie_db, &inner_db).await?; + log::info!("Initialize genesis block."); + let _genesis = execute_genesis(genesis, spec, &db_group).await?; } - - self.start(key_provider, storage, trie_db, inner_db).await + start(version, config, key_provider, &db_group).await })?; rt.shutdown_timeout(std::time::Duration::from_secs(1)); Ok(()) } +} - async fn create_genesis( - &mut self, - storage: &Arc>, - trie_db: &Arc, - inner_db: &Arc, - ) -> ProtocolResult<()> { - let resp = execute_transactions( - &self.genesis, - storage, - trie_db, - inner_db, - &self.spec.accounts, - )?; - - log::info!( - "Execute the genesis distribute success, genesis state root {:?}, response {:?}", - resp.state_root, - resp - ); - - self.state_root = resp.state_root; - self.apply_genesis_data(resp.state_root, resp.receipt_root)?; - - log::info!("The genesis block is created {:?}", self.genesis.block); - - storage.save_block(&self.genesis, &resp).await?; - - Ok(()) - } - - async fn apply_genesis_after_checks(&mut self, loaded_genesis: &Block) -> ProtocolResult<()> { - let tmp_dir = tempfile::tempdir().map_err(|err| { - let err_msg = format!("failed to create temporary directory since {err:?}"); - MainError::Other(err_msg) +async fn start( + version: String, + config: Config, + key_provider: Option, + db_group: &DatabaseGroup, +) -> ProtocolResult<()> { + let storage = db_group.storage(); + let trie_db = db_group.trie_db(); + let inner_db = db_group.inner_db(); + + components::profiling::start(); + components::profiling::track_db_process("blockdb", &inner_db); + components::profiling::track_current_process(); + + // Start jaeger + config.jaeger.start_if_possible(); + + // Start prometheus http server + config.prometheus.start_if_possible(); + + log::info!("node starts"); + + observe_listen_port_occupancy(&[config.network.listening_address.clone()]).await?; + + // Init Block db and get the current block + let current_block = storage.get_latest_block(Context::new()).await?; + let current_state_root = current_block.header.state_root; + + log::info!("At block number {}", current_block.header.number + 1); + + // Init network + let mut network_service = + init_network_service(&config, current_block.header.chain_id, key_provider)?; + + // Init full transactions wal + let txs_wal_path = config + .data_path_for_txs_wal() + .to_str() + .map(ToOwned::to_owned) + .ok_or_else(|| { + let msg = format!( + "failed to convert WAL path {} to string", + config.data_path_for_txs_wal().display() + ); + MainError::Other(msg) })?; - let path_block = tmp_dir.path().join("block"); - - let (storage, trie_db, inner_db) = init_storage( - &self.config.rocksdb, - path_block, - self.config.executor.triedb_cache_size, - )?; + let txs_wal = Arc::new(SignedTxsWAL::new(txs_wal_path)); - let resp = execute_transactions( - &self.genesis, - &storage, - &trie_db, - &inner_db, - &self.spec.accounts, + // Init system contract + if current_block.header.number != 0 { + let mut backend = AxonExecutorApplyAdapter::from_root( + current_block.header.state_root, + Arc::clone(&trie_db), + Arc::clone(&storage), + Proposal::new_without_state_root(¤t_block.header).into(), )?; - self.apply_genesis_data(resp.state_root, resp.receipt_root)?; - - let user_provided_genesis = &self.genesis.block; - if user_provided_genesis != loaded_genesis { - let err_msg = format!( - "The user provided genesis (hash: {:#x}) is NOT \ - the same as the genesis in storage (hash: {:#x})", - user_provided_genesis.hash(), - loaded_genesis.hash() - ); - return Err(MainError::Other(err_msg).into()); - } - - Ok(()) + system_contract::init(inner_db, &mut backend); } - fn apply_genesis_data(&mut self, state_root: H256, receipts_root: H256) -> ProtocolResult<()> { - if self.genesis.block.header.state_root.is_zero() { - self.genesis.block.header.state_root = state_root; - } else if self.genesis.block.header.state_root != state_root { - let errmsg = format!( - "The state root of genesis block which user provided is incorrect, \ - if you don't know it, you can just set it as {:#x}.", - H256::default() - ); - return Err(MainError::Other(errmsg).into()); - } - if self.genesis.block.header.receipts_root.is_zero() { - self.genesis.block.header.receipts_root = receipts_root; - } else if self.genesis.block.header.receipts_root != receipts_root { - let errmsg = format!( - "The receipts root of genesis block which user provided is incorrect, \ - if you don't know it, you can just set it as {:#x}.", - H256::default() - ); - return Err(MainError::Other(errmsg).into()); - } - Ok(()) - } - - pub async fn start( - self, - key_provider: Option, - storage: Arc>, - trie_db: Arc, - inner_db: Arc, - ) -> ProtocolResult<()> { - // Start jaeger - self.config.jaeger.start_if_possible(); - - // Start prometheus http server - self.config.prometheus.start_if_possible(); - - components::profiling::track_db_process("blockdb", &inner_db); - components::profiling::track_current_process(); - - log::info!("node starts"); - - observe_listen_port_occupancy(&[self.config.network.listening_address.clone()]).await?; - - // Init Block db and get the current block - let current_block = storage.get_latest_block(Context::new()).await?; - let current_state_root = current_block.header.state_root; - - log::info!("At block number {}", current_block.header.number + 1); - - // Init network - let mut network_service = self.init_network_service(key_provider); - - // Init full transactions wal - let txs_wal_path = self - .config - .data_path_for_txs_wal() - .to_str() - .unwrap() - .to_string(); - let txs_wal = Arc::new(SignedTxsWAL::new(txs_wal_path)); - - // Init system contract - if current_block.header.number != 0 { - let mut backend = AxonExecutorApplyAdapter::from_root( - current_block.header.state_root, - Arc::clone(&trie_db), - Arc::clone(&storage), - Proposal::new_without_state_root(¤t_block.header).into(), - ) - .unwrap(); - - system_contract::init(inner_db, &mut backend); - } - - // Init mempool and recover signed transactions with the current block number - let current_stxs = txs_wal.load_by_number(current_block.header.number + 1); - log::info!("Recover {} txs from wal", current_stxs.len()); - - let mempool = self - .init_mempool(&trie_db, &network_service.handle(), &storage, ¤t_stxs) - .await; - - // Get the validator list from current metadata for consensus initialization - let metadata_root = AxonExecutorReadOnlyAdapter::from_root( - current_state_root, - Arc::clone(&trie_db), - Arc::clone(&storage), - Proposal::new_without_state_root(&self.genesis.block.header).into(), - )? - .get_metadata_root(); - let metadata = MetadataHandle::new(metadata_root) - .get_metadata_by_block_number(current_block.header.number)?; - let validators: Vec = metadata - .verifier_list - .iter() - .map(|v| Validator { - pub_key: v.pub_key.as_bytes(), - propose_weight: v.propose_weight, - vote_weight: v.vote_weight, - }) - .collect::>(); - - // Set args in mempool - mempool.set_args( - Context::new(), - current_block.header.state_root, - metadata.consensus_config.gas_limit, - metadata.consensus_config.max_tx_size, - ); - - // Init overlord consensus and synchronization - let lock = Arc::new(AsyncMutex::new(())); - let crypto = self.init_crypto(&metadata.verifier_list); - let consensus_adapter = OverlordConsensusAdapter::<_, _, _, _>::new( - Arc::new(network_service.handle()), - Arc::clone(&mempool), - Arc::clone(&storage), - Arc::clone(&trie_db), + // Init mempool and recover signed transactions with the current block number + let current_stxs = txs_wal.load_by_number(current_block.header.number + 1); + log::info!("Recover {} txs from wal", current_stxs.len()); + + let mempool = init_mempool( + &config.mempool, + ¤t_block.header, + &storage, + &trie_db, + &network_service.handle(), + ¤t_stxs, + ) + .await; + + // Get the validator list from current metadata for consensus initialization + let metadata_root = AxonExecutorReadOnlyAdapter::from_root( + current_state_root, + Arc::clone(&trie_db), + Arc::clone(&storage), + Proposal::new_without_state_root(¤t_block.header).into(), + )? + .get_metadata_root(); + let metadata = MetadataHandle::new(metadata_root) + .get_metadata_by_block_number(current_block.header.number)?; + let validators: Vec = metadata + .verifier_list + .iter() + .map(|v| Validator { + pub_key: v.pub_key.as_bytes(), + propose_weight: v.propose_weight, + vote_weight: v.vote_weight, + }) + .collect::>(); + + // Set args in mempool + mempool.set_args( + Context::new(), + current_block.header.state_root, + metadata.consensus_config.gas_limit, + metadata.consensus_config.max_tx_size, + ); + + // Init overlord consensus and synchronization + let lock = Arc::new(AsyncMutex::new(())); + let crypto = init_crypto(config.privkey.as_ref(), &metadata.verifier_list)?; + let consensus_adapter = OverlordConsensusAdapter::<_, _, _, _>::new( + Arc::new(network_service.handle()), + Arc::clone(&mempool), + Arc::clone(&storage), + Arc::clone(&trie_db), + Arc::clone(&crypto), + )?; + let consensus_adapter = Arc::new(consensus_adapter); + let status_agent = get_status_agent(&storage, ¤t_block, &metadata).await?; + + let overlord_consensus = { + let consensus_wal_path = config.data_path_for_consensus_wal(); + let node_info = Secp256k1PrivateKey::try_from(config.privkey.as_ref()) + .map(|privkey| NodeInfo::new(current_block.header.chain_id, privkey.pub_key())) + .map_err(MainError::Crypto)?; + let overlord_consensus = OverlordConsensus::new( + status_agent.clone(), + node_info, Arc::clone(&crypto), - )?; - let consensus_adapter = Arc::new(consensus_adapter); - let status_agent = self - .get_status_agent(&storage, ¤t_block, &metadata) - .await; + Arc::clone(&txs_wal), + Arc::clone(&consensus_adapter), + Arc::clone(&lock), + Arc::new(ConsensusWal::new(consensus_wal_path)), + ) + .await; + Arc::new(overlord_consensus) + }; + + consensus_adapter.set_overlord_handler(overlord_consensus.get_overlord_handler()); + + let synchronization = Arc::new(OverlordSynchronization::<_>::new( + config.sync.sync_txs_chunk_size, + consensus_adapter, + status_agent.clone(), + lock, + )); + + network_service.tag_consensus(&metadata.verifier_list)?; + + // register endpoints to network service + network_service.register_mempool_endpoint(&mempool)?; + network_service.register_consensus_endpoint(&overlord_consensus)?; + network_service.register_synchronization_endpoint(&synchronization)?; + network_service.register_storage_endpoint(&storage)?; + network_service.register_rpc()?; + + let network_handle = network_service.handle(); + + // Run network service at the end of its life cycle + tokio::spawn(network_service.run()); + + // Run API + let api_adapter = Arc::new(DefaultAPIAdapter::new( + Arc::clone(&mempool), + Arc::clone(&storage), + Arc::clone(&trie_db), + Arc::new(network_handle), + )); + let _handles = run_jsonrpc_server(version, config, api_adapter).await?; + + // Run sync + tokio::spawn(async move { + if let Err(e) = synchronization.polling_broadcast().await { + log::error!("synchronization: {:?}", e); + } + }); - let overlord_consensus = self - .init_overlord_consensus(&status_agent, &txs_wal, &crypto, &lock, &consensus_adapter) - .await; + // Run consensus + run_overlord_consensus(metadata, validators, current_block, overlord_consensus); - consensus_adapter.set_overlord_handler(overlord_consensus.get_overlord_handler()); + components::system::set_ctrl_c_handle().await; + components::profiling::stop(); - let synchronization = Arc::new(OverlordSynchronization::<_>::new( - self.config.sync.sync_txs_chunk_size, - consensus_adapter, - status_agent.clone(), - lock, - )); + Ok(()) +} - network_service.tag_consensus(&metadata.verifier_list)?; +fn init_network_service( + config: &Config, + chain_id: u64, + key_provider: Option, +) -> ProtocolResult>> { + let network_config = NetworkConfig::from_config(config, chain_id)?; - // register endpoints to network service - network_service.register_mempool_endpoint(&mempool)?; - network_service.register_consensus_endpoint(&overlord_consensus)?; - network_service.register_synchronization_endpoint(&synchronization)?; - network_service.register_storage_endpoint(&storage)?; - network_service.register_rpc()?; + let key = key_provider + .map(KeyP::Custom) + .unwrap_or(KeyP::Default(network_config.secio_keypair.clone())); - let network_handle = network_service.handle(); + Ok(NetworkService::new(network_config, key)) +} - // Run network service at the end of its life cycle - tokio::spawn(network_service.run()); +async fn init_mempool( + config: &ConfigMempool, + current_header: &Header, + storage: &Arc, + trie_db: &Arc, + network_service: &N, + signed_txs: &[SignedTransaction], +) -> Arc>> +where + N: Rpc + PeerTrust + Gossip + Clone + Unpin + 'static, + S: Storage + 'static, + DB: TrieDB + Send + Sync + 'static, +{ + let mempool_adapter = DefaultMemPoolAdapter::::new( + network_service.clone(), + Arc::clone(storage), + Arc::clone(trie_db), + current_header.chain_id, + current_header.gas_limit.as_u64(), + config.pool_size as usize, + config.broadcast_txs_size, + config.broadcast_txs_interval, + ); + let mempool = Arc::new( + MemPoolImpl::new( + config.pool_size as usize, + config.timeout_gap, + mempool_adapter, + signed_txs.to_owned(), + ) + .await, + ); + + // Clone the mempool and spawn a thread to monitor the mempool length. + let monitor_mempool = Arc::clone(&mempool); + tokio::spawn(async move { + let interval = Duration::from_millis(1000); + loop { + sleep(interval).await; + MEMPOOL_LEN_GAUGE.set(monitor_mempool.len() as i64); + MEMPOOL_CO_QUEUE_LEN.set(monitor_mempool.len() as i64); + } + }); - // Run API - let api_adapter = Arc::new(DefaultAPIAdapter::new( - Arc::clone(&mempool), - Arc::clone(&storage), - Arc::clone(&trie_db), - Arc::new(network_handle), - )); - let _handles = run_jsonrpc_server(self.version, self.config.clone(), api_adapter).await?; - - // Run sync - tokio::spawn(async move { - if let Err(e) = synchronization.polling_broadcast().await { - log::error!("synchronization: {:?}", e); - } - }); + mempool +} - // Run consensus - Self::run_overlord_consensus(metadata, validators, current_block, overlord_consensus); +fn init_crypto( + privkey: &[u8], + validators: &[ValidatorExtend], +) -> ProtocolResult> { + let bls_priv_key = BlsPrivateKey::try_from(privkey).map_err(MainError::Crypto)?; + + let mut bls_pub_keys = HashMap::new(); + for validator_extend in validators.iter() { + let address = validator_extend.pub_key.as_bytes(); + let hex_pubkey = validator_extend.bls_pub_key.as_bytes(); + let pub_key = BlsPublicKey::try_from(hex_pubkey.as_ref()).map_err(MainError::Crypto)?; + bls_pub_keys.insert(address, pub_key); + } - components::system::set_ctrl_c_handle().await; - components::profiling::stop(); + let crypto = OverlordCrypto::new(bls_priv_key, bls_pub_keys, String::new()); + Ok(Arc::new(crypto)) +} - Ok(()) - } +async fn get_status_agent( + storage: &Arc, + block: &Block, + metadata: &Metadata, +) -> ProtocolResult { + let header = &block.header; + let latest_proof = storage.get_latest_proof(Context::new()).await?; + let current_consensus_status = CurrentStatus { + prev_hash: block.hash(), + last_number: header.number, + max_tx_size: metadata.consensus_config.max_tx_size.into(), + tx_num_limit: metadata.consensus_config.tx_num_limit, + proof: latest_proof, + last_state_root: header.state_root, + }; + + CHAIN_ID.swap(Arc::new(header.chain_id)); + + let status_agent = StatusAgent::new(current_consensus_status); + Ok(status_agent) +} - fn init_network_service( - &self, - key_provider: Option, - ) -> NetworkService> { - let network_config = - NetworkConfig::from_config(&self.config, self.genesis.block.header.chain_id).unwrap(); +fn run_overlord_consensus( + metadata: Metadata, + validators: Vec, + current_block: Block, + overlord_consensus: Arc>>, +) where + M: MemPool, + N: Rpc + PeerTrust + Gossip + Network + 'static, + S: Storage, + DB: TrieDB + Send + Sync, +{ + let timer_config = DurationConfig { + propose_ratio: metadata.consensus_config.propose_ratio, + prevote_ratio: metadata.consensus_config.prevote_ratio, + precommit_ratio: metadata.consensus_config.precommit_ratio, + brake_ratio: metadata.consensus_config.brake_ratio, + }; + + tokio::spawn(async move { + if let Err(e) = overlord_consensus + .run( + current_block.header.number, + metadata.consensus_config.interval, + validators, + Some(timer_config), + ) + .await + { + log::error!("axon-consensus: {:?} error", e); + } + }); +} - let key = key_provider - .map(KeyP::Custom) - .unwrap_or(KeyP::Default(network_config.secio_keypair.clone())); +async fn execute_genesis( + mut partial_genesis: RichBlock, + spec: ChainSpec, + db_group: &DatabaseGroup, +) -> ProtocolResult { + let resp = execute_transactions(&partial_genesis, db_group, &spec.accounts)?; - NetworkService::new(network_config, key) - } + partial_genesis.block.header.state_root = resp.state_root; + partial_genesis.block.header.receipts_root = resp.receipt_root; - async fn init_mempool( - &self, - trie_db: &Arc, - network_service: &N, - storage: &Arc, - signed_txs: &[SignedTransaction], - ) -> Arc>> - where - N: Rpc + PeerTrust + Gossip + Clone + Unpin + 'static, - S: Storage + 'static, - DB: TrieDB + Send + Sync + 'static, - { - let mempool_adapter = DefaultMemPoolAdapter::::new( - network_service.clone(), - Arc::clone(storage), - Arc::clone(trie_db), - self.genesis.block.header.chain_id, - self.genesis.block.header.gas_limit.as_u64(), - self.config.mempool.pool_size as usize, - self.config.mempool.broadcast_txs_size, - self.config.mempool.broadcast_txs_interval, - ); - let mempool = Arc::new( - MemPoolImpl::new( - self.config.mempool.pool_size as usize, - self.config.mempool.timeout_gap, - mempool_adapter, - signed_txs.to_owned(), - ) - .await, - ); - - // Clone the mempool and spawn a thread to monitor the mempool length. - let monitor_mempool = Arc::clone(&mempool); - tokio::spawn(async move { - let interval = Duration::from_millis(1000); - loop { - sleep(interval).await; - MEMPOOL_LEN_GAUGE.set(monitor_mempool.len() as i64); - MEMPOOL_CO_QUEUE_LEN.set(monitor_mempool.len() as i64); - } - }); + log::info!("The genesis block is executed {:?}", partial_genesis.block); + log::info!("Response for genesis is {:?}", resp); - mempool - } + db_group + .storage() + .save_block(&partial_genesis, &resp) + .await?; - fn init_crypto(&self, validators: &[ValidatorExtend]) -> Arc { - // self private key - let bls_priv_key = BlsPrivateKey::try_from(self.config.privkey.as_ref()) - .map_err(MainError::Crypto) - .unwrap(); - - let mut bls_pub_keys = HashMap::new(); - for validator_extend in validators.iter() { - let address = validator_extend.pub_key.as_bytes(); - let hex_pubkey = hex_decode(&validator_extend.bls_pub_key.as_string_trim0x()).unwrap(); - let pub_key = BlsPublicKey::try_from(hex_pubkey.as_ref()) - .map_err(MainError::Crypto) - .unwrap(); - bls_pub_keys.insert(address, pub_key); - } + Ok(partial_genesis) +} - Arc::new(OverlordCrypto::new( - bls_priv_key, - bls_pub_keys, - String::new(), - )) - } +async fn execute_genesis_temporarily( + mut partial_genesis: RichBlock, + spec: ChainSpec, + config: &ConfigRocksDB, + triedb_cache_size: usize, +) -> ProtocolResult { + let tmp_dir = tempfile::tempdir().map_err(|err| { + let err_msg = format!("failed to create temporary directory since {err:?}"); + MainError::Other(err_msg) + })?; + let path_block = tmp_dir.path().join("block"); - async fn get_status_agent( - &self, - storage: &Arc, - block: &Block, - metadata: &Metadata, - ) -> StatusAgent { - let header = &block.header; - let latest_proof = storage.get_latest_proof(Context::new()).await.unwrap(); - let current_consensus_status = CurrentStatus { - prev_hash: block.hash(), - last_number: header.number, - max_tx_size: metadata.consensus_config.max_tx_size.into(), - tx_num_limit: metadata.consensus_config.tx_num_limit, - proof: latest_proof, - last_state_root: if header.number == 0 { - self.state_root - } else { - header.state_root - }, - }; + let db_group = DatabaseGroup::new(config, path_block, triedb_cache_size)?; - CHAIN_ID.swap(Arc::new(header.chain_id)); + let resp = execute_transactions(&partial_genesis, &db_group, &spec.accounts)?; - StatusAgent::new(current_consensus_status) - } + partial_genesis.block.header.state_root = resp.state_root; + partial_genesis.block.header.receipts_root = resp.receipt_root; - async fn init_overlord_consensus( - &self, - status_agent: &StatusAgent, - txs_wal: &Arc, - crypto: &Arc, - lock: &Arc>, - consensus_adapter: &Arc>, - ) -> Arc>> - where - M: MemPool + 'static, - N: Rpc + PeerTrust + Gossip + Network + 'static, - S: Storage + 'static, - DB: TrieDB + Send + Sync + 'static, - { - let consensus_wal_path = self - .config - .data_path_for_consensus_wal() - .to_str() - .unwrap() - .to_string(); - let consensus_wal = Arc::new(ConsensusWal::new(consensus_wal_path)); - - let my_privkey = Secp256k1PrivateKey::try_from(self.config.privkey.as_ref()) - .map_err(MainError::Crypto) - .unwrap(); - let node_info = NodeInfo::new(self.genesis.block.header.chain_id, my_privkey.pub_key()); - - Arc::new( - OverlordConsensus::new( - status_agent.clone(), - node_info, - Arc::clone(crypto), - Arc::clone(txs_wal), - Arc::clone(consensus_adapter), - Arc::clone(lock), - Arc::clone(&consensus_wal), - ) - .await, - ) - } + log::info!("The genesis block is executed {:?}", partial_genesis.block); + log::info!("Response for genesis is {:?}", resp); - fn run_overlord_consensus( - metadata: Metadata, - validators: Vec, - current_block: Block, - overlord_consensus: Arc>>, - ) where - M: MemPool, - N: Rpc + PeerTrust + Gossip + Network + 'static, - S: Storage, - DB: TrieDB + Send + Sync, - { - let timer_config = DurationConfig { - propose_ratio: metadata.consensus_config.propose_ratio, - prevote_ratio: metadata.consensus_config.prevote_ratio, - precommit_ratio: metadata.consensus_config.precommit_ratio, - brake_ratio: metadata.consensus_config.brake_ratio, - }; - - tokio::spawn(async move { - if let Err(e) = overlord_consensus - .run( - current_block.header.number, - metadata.consensus_config.interval, - validators, - Some(timer_config), - ) - .await - { - log::error!("axon-consensus: {:?} error", e); - } - }); - } + Ok(partial_genesis) } -fn execute_transactions( +fn execute_transactions( rich: &RichBlock, - storage: &Arc, - trie_db: &Arc, - inner_db: &Arc, + db_group: &DatabaseGroup, accounts: &[InitialAccount], -) -> ProtocolResult -where - S: Storage + 'static, -{ - let state_root = MPTTrie::new(Arc::clone(&trie_db)) - .insert_accounts(accounts)? +) -> ProtocolResult { + let state_root = MPTTrie::new(db_group.trie_db()) + .insert_accounts(accounts) + .expect("insert accounts") .commit()?; let executor = AxonExecutor::default(); let mut backend = AxonExecutorApplyAdapter::from_root( state_root, - Arc::clone(trie_db), - Arc::clone(storage), + db_group.trie_db(), + db_group.storage(), Proposal::new_without_state_root(&rich.block.header).into(), )?; - system_contract::init(Arc::clone(inner_db), &mut backend); + system_contract::init(db_group.inner_db(), &mut backend); let resp = executor.exec(&mut backend, &rich.txs, &[]); diff --git a/core/run/src/tests.rs b/core/run/src/tests.rs index 7ea53ca32..705ca33e3 100644 --- a/core/run/src/tests.rs +++ b/core/run/src/tests.rs @@ -18,7 +18,7 @@ use protocol::{ types::{RichBlock, H256}, }; -use crate::{execute_transactions, init_storage}; +use crate::{execute_transactions, DatabaseGroup}; const DEV_CONFIG_DIR: &str = "../../devtools/chain"; @@ -129,21 +129,15 @@ async fn check_genesis_data<'a>(case: &TestCase<'a>) { ); } let path_block = tmp_dir.path().join("block"); - let (storage, trie_db, inner_db) = init_storage( + let db_group = DatabaseGroup::new( &config.rocksdb, path_block, config.executor.triedb_cache_size, ) - .expect("initialize storage"); - - let resp = execute_transactions( - &genesis, - &storage, - &trie_db, - &inner_db, - &chain_spec.accounts, - ) - .expect("execute transactions"); + .expect("initialize databases"); + + let resp = execute_transactions(&genesis, &db_group, &chain_spec.accounts) + .expect("execute transactions"); check_hashes( case.chain_name,