From 68e2b0e6a2eb9a04a9cc65fdd62401b602a3a0eb Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Tue, 29 Aug 2023 21:36:42 +0800 Subject: [PATCH] refactor(core-run): only required parameters in function signatures --- core/run/src/components/storage.rs | 48 +- core/run/src/lib.rs | 818 ++++++++++++++--------------- core/run/src/tests.rs | 18 +- 3 files changed, 420 insertions(+), 464 deletions(-) diff --git a/core/run/src/components/storage.rs b/core/run/src/components/storage.rs index 2f488ee54..845a88190 100644 --- a/core/run/src/components/storage.rs +++ b/core/run/src/components/storage.rs @@ -13,20 +13,40 @@ use protocol::{ ProtocolResult, }; -pub(crate) fn init_storage>( - config: &ConfigRocksDB, - rocksdb_path: P, - triedb_cache_size: usize, -) -> ProtocolResult<( - Arc>, - Arc, - Arc, -)> { - let adapter = Arc::new(RocksAdapter::new(rocksdb_path, config.clone())?); - let inner_db = adapter.inner_db(); - let trie_db = Arc::new(RocksTrieDB::new_evm(adapter.inner_db(), triedb_cache_size)); - let storage = Arc::new(ImplStorage::new(adapter, config.cache_size)); - Ok((storage, trie_db, inner_db)) +pub(crate) struct DatabaseGroup { + storage: Arc>, + trie_db: Arc, + inner_db: Arc, +} + +impl DatabaseGroup { + pub(crate) fn new>( + config: &ConfigRocksDB, + rocksdb_path: P, + triedb_cache_size: usize, + ) -> ProtocolResult { + let adapter = Arc::new(RocksAdapter::new(rocksdb_path, config.clone())?); + let inner_db = adapter.inner_db(); + let trie_db = Arc::new(RocksTrieDB::new_evm(adapter.inner_db(), triedb_cache_size)); + let storage = Arc::new(ImplStorage::new(adapter, config.cache_size)); + Ok(Self { + storage, + trie_db, + inner_db, + }) + } + + pub(crate) fn storage(&self) -> Arc> { + Arc::clone(&self.storage) + } + + pub(crate) fn trie_db(&self) -> Arc { + Arc::clone(&self.trie_db) + } + + pub(crate) fn inner_db(&self) -> Arc { + Arc::clone(&self.inner_db) + } } #[async_trait] diff --git a/core/run/src/lib.rs b/core/run/src/lib.rs index eeec8f615..09c308547 100644 --- a/core/run/src/lib.rs +++ b/core/run/src/lib.rs @@ -4,10 +4,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use common_apm::metrics::mempool::{MEMPOOL_CO_QUEUE_LEN, MEMPOOL_LEN_GAUGE}; use common_config_parser::types::spec::{ChainSpec, InitialAccount}; -use common_config_parser::types::Config; +use common_config_parser::types::{Config, ConfigMempool, ConfigRocksDB}; use common_crypto::{BlsPrivateKey, BlsPublicKey, Secp256k1, Secp256k1PrivateKey, ToPublicKey}; -use protocol::codec::hex_decode; use protocol::tokio::{ self, runtime::Builder as RuntimeBuilder, sync::Mutex as AsyncMutex, time::sleep, }; @@ -15,8 +14,8 @@ use protocol::traits::{ Context, Executor, Gossip, MemPool, Network, NodeInfo, PeerTrust, ReadOnlyStorage, Rpc, Storage, }; use protocol::types::{ - Block, ExecResp, MerkleRoot, Metadata, Proposal, RichBlock, SignedTransaction, Validator, - ValidatorExtend, H256, + Block, ExecResp, Header, Metadata, Proposal, RichBlock, SignedTransaction, Validator, + ValidatorExtend, }; use protocol::{lazy::CHAIN_ID, trie::DB as TrieDB, ProtocolResult}; @@ -26,15 +25,11 @@ use core_consensus::{ util::OverlordCrypto, ConsensusWal, DurationConfig, OverlordConsensus, OverlordConsensusAdapter, OverlordSynchronization, SignedTxsWAL, }; -use core_db::{RocksAdapter, RocksDB}; use core_executor::system_contract::{self, metadata::MetadataHandle}; -use core_executor::{ - AxonExecutor, AxonExecutorApplyAdapter, AxonExecutorReadOnlyAdapter, MPTTrie, RocksTrieDB, -}; +use core_executor::{AxonExecutor, AxonExecutorApplyAdapter, AxonExecutorReadOnlyAdapter, MPTTrie}; use core_interoperation::InteroperationImpl; use core_mempool::{DefaultMemPoolAdapter, MemPoolImpl}; use core_network::{observe_listen_port_occupancy, NetworkConfig, NetworkService}; -use core_storage::ImplStorage; pub use core_network::{KeyProvider, SecioKeyPair}; @@ -48,18 +43,17 @@ mod tests; use components::{ extensions::ExtensionConfig as _, network::NetworkServiceExt as _, - storage::{init_storage, StorageExt as _, TrieExt as _}, + storage::{DatabaseGroup, StorageExt as _, TrieExt as _}, }; pub use error::MainError; use key_provider::KeyP; #[derive(Debug)] pub struct Axon { - version: String, - config: Config, - spec: ChainSpec, - genesis: RichBlock, - state_root: MerkleRoot, + version: String, + config: Config, + spec: ChainSpec, + genesis: RichBlock, } impl Axon { @@ -69,12 +63,16 @@ impl Axon { config, spec, genesis, - state_root: MerkleRoot::default(), } } - pub fn run(mut self, key_provider: Option) -> ProtocolResult<()> { - components::profiling::start(); + pub fn run(self, key_provider: Option) -> ProtocolResult<()> { + let Self { + version, + config, + spec, + genesis, + } = self; let rt = RuntimeBuilder::new_multi_thread() .enable_all() @@ -82,476 +80,420 @@ impl Axon { .expect("new tokio runtime"); rt.block_on(async move { - let (storage, trie_db, inner_db) = init_storage( - &self.config.rocksdb, - self.config.data_path_for_rocksdb(), - self.config.executor.triedb_cache_size, + let db_group = DatabaseGroup::new( + &config.rocksdb, + config.data_path_for_rocksdb(), + config.executor.triedb_cache_size, )?; - if let Some(genesis) = storage.try_load_genesis().await? { - log::info!("The Genesis block has been initialized."); - self.apply_genesis_after_checks(&genesis).await?; + if let Some(loaded_genesis) = db_group.storage().try_load_genesis().await? { + log::info!("Check genesis block."); + let genesis = execute_genesis_temporarily( + genesis, + spec, + &config.rocksdb, + config.executor.triedb_cache_size, + ) + .await?; + if genesis.block != loaded_genesis { + let err_msg = format!( + "The user provided genesis (hash: {:#x}) is NOT \ + the same as the genesis in storage (hash: {:#x})", + genesis.block.hash(), + loaded_genesis.hash() + ); + return Err(MainError::Other(err_msg).into()); + } } else { - self.create_genesis(&storage, &trie_db, &inner_db).await?; + log::info!("Initialize genesis block."); + let _genesis = execute_genesis(genesis, spec, &db_group).await?; } - - self.start(key_provider, storage, trie_db, inner_db).await + start(version, config, key_provider, &db_group).await })?; rt.shutdown_timeout(std::time::Duration::from_secs(1)); Ok(()) } +} - async fn create_genesis( - &mut self, - storage: &Arc>, - trie_db: &Arc, - inner_db: &Arc, - ) -> ProtocolResult<()> { - let resp = execute_transactions( - &self.genesis, - storage, - trie_db, - inner_db, - &self.spec.accounts, - )?; - - log::info!( - "Execute the genesis distribute success, genesis state root {:?}, response {:?}", - resp.state_root, - resp - ); - - self.state_root = resp.state_root; - self.apply_genesis_data(resp.state_root, resp.receipt_root)?; - - log::info!("The genesis block is created {:?}", self.genesis.block); - - storage.save_block(&self.genesis, &resp).await?; - - Ok(()) - } - - async fn apply_genesis_after_checks(&mut self, loaded_genesis: &Block) -> ProtocolResult<()> { - let tmp_dir = tempfile::tempdir().map_err(|err| { - let err_msg = format!("failed to create temporary directory since {err:?}"); - MainError::Other(err_msg) +async fn start( + version: String, + config: Config, + key_provider: Option, + db_group: &DatabaseGroup, +) -> ProtocolResult<()> { + let storage = db_group.storage(); + let trie_db = db_group.trie_db(); + let inner_db = db_group.inner_db(); + + components::profiling::start(); + components::profiling::track_db_process("blockdb", &inner_db); + components::profiling::track_current_process(); + + // Start jaeger + config.jaeger.start_if_possible(); + + // Start prometheus http server + config.prometheus.start_if_possible(); + + log::info!("node starts"); + + observe_listen_port_occupancy(&[config.network.listening_address.clone()]).await?; + + // Init Block db and get the current block + let current_block = storage.get_latest_block(Context::new()).await?; + let current_state_root = current_block.header.state_root; + + log::info!("At block number {}", current_block.header.number + 1); + + // Init network + let mut network_service = + init_network_service(&config, current_block.header.chain_id, key_provider)?; + + // Init full transactions wal + let txs_wal_path = config + .data_path_for_txs_wal() + .to_str() + .map(ToOwned::to_owned) + .ok_or_else(|| { + let msg = format!( + "failed to convert WAL path {} to string", + config.data_path_for_txs_wal().display() + ); + MainError::Other(msg) })?; - let path_block = tmp_dir.path().join("block"); - - let (storage, trie_db, inner_db) = init_storage( - &self.config.rocksdb, - path_block, - self.config.executor.triedb_cache_size, - )?; + let txs_wal = Arc::new(SignedTxsWAL::new(txs_wal_path)); - let resp = execute_transactions( - &self.genesis, - &storage, - &trie_db, - &inner_db, - &self.spec.accounts, + // Init system contract + if current_block.header.number != 0 { + let mut backend = AxonExecutorApplyAdapter::from_root( + current_block.header.state_root, + Arc::clone(&trie_db), + Arc::clone(&storage), + Proposal::new_without_state_root(¤t_block.header).into(), )?; - self.apply_genesis_data(resp.state_root, resp.receipt_root)?; - - let user_provided_genesis = &self.genesis.block; - if user_provided_genesis != loaded_genesis { - let err_msg = format!( - "The user provided genesis (hash: {:#x}) is NOT \ - the same as the genesis in storage (hash: {:#x})", - user_provided_genesis.hash(), - loaded_genesis.hash() - ); - return Err(MainError::Other(err_msg).into()); - } - - Ok(()) + system_contract::init(inner_db, &mut backend); } - fn apply_genesis_data(&mut self, state_root: H256, receipts_root: H256) -> ProtocolResult<()> { - if self.genesis.block.header.state_root.is_zero() { - self.genesis.block.header.state_root = state_root; - } else if self.genesis.block.header.state_root != state_root { - let errmsg = format!( - "The state root of genesis block which user provided is incorrect, \ - if you don't know it, you can just set it as {:#x}.", - H256::default() - ); - return Err(MainError::Other(errmsg).into()); - } - if self.genesis.block.header.receipts_root.is_zero() { - self.genesis.block.header.receipts_root = receipts_root; - } else if self.genesis.block.header.receipts_root != receipts_root { - let errmsg = format!( - "The receipts root of genesis block which user provided is incorrect, \ - if you don't know it, you can just set it as {:#x}.", - H256::default() - ); - return Err(MainError::Other(errmsg).into()); - } - Ok(()) - } - - pub async fn start( - self, - key_provider: Option, - storage: Arc>, - trie_db: Arc, - inner_db: Arc, - ) -> ProtocolResult<()> { - // Start jaeger - self.config.jaeger.start_if_possible(); - - // Start prometheus http server - self.config.prometheus.start_if_possible(); - - components::profiling::track_db_process("blockdb", &inner_db); - components::profiling::track_current_process(); - - log::info!("node starts"); - - observe_listen_port_occupancy(&[self.config.network.listening_address.clone()]).await?; - - // Init Block db and get the current block - let current_block = storage.get_latest_block(Context::new()).await?; - let current_state_root = current_block.header.state_root; - - log::info!("At block number {}", current_block.header.number + 1); - - // Init network - let mut network_service = self.init_network_service(key_provider); - - // Init full transactions wal - let txs_wal_path = self - .config - .data_path_for_txs_wal() - .to_str() - .unwrap() - .to_string(); - let txs_wal = Arc::new(SignedTxsWAL::new(txs_wal_path)); - - // Init system contract - if current_block.header.number != 0 { - let mut backend = AxonExecutorApplyAdapter::from_root( - current_block.header.state_root, - Arc::clone(&trie_db), - Arc::clone(&storage), - Proposal::new_without_state_root(¤t_block.header).into(), - ) - .unwrap(); - - system_contract::init(inner_db, &mut backend); - } - - // Init mempool and recover signed transactions with the current block number - let current_stxs = txs_wal.load_by_number(current_block.header.number + 1); - log::info!("Recover {} txs from wal", current_stxs.len()); - - let mempool = self - .init_mempool(&trie_db, &network_service.handle(), &storage, ¤t_stxs) - .await; - - // Get the validator list from current metadata for consensus initialization - let metadata_root = AxonExecutorReadOnlyAdapter::from_root( - current_state_root, - Arc::clone(&trie_db), - Arc::clone(&storage), - Proposal::new_without_state_root(&self.genesis.block.header).into(), - )? - .get_metadata_root(); - let metadata = MetadataHandle::new(metadata_root) - .get_metadata_by_block_number(current_block.header.number)?; - let validators: Vec = metadata - .verifier_list - .iter() - .map(|v| Validator { - pub_key: v.pub_key.as_bytes(), - propose_weight: v.propose_weight, - vote_weight: v.vote_weight, - }) - .collect::>(); - - // Set args in mempool - mempool.set_args( - Context::new(), - current_block.header.state_root, - metadata.consensus_config.gas_limit, - metadata.consensus_config.max_tx_size, - ); - - // Init overlord consensus and synchronization - let lock = Arc::new(AsyncMutex::new(())); - let crypto = self.init_crypto(&metadata.verifier_list); - let consensus_adapter = OverlordConsensusAdapter::<_, _, _, _>::new( - Arc::new(network_service.handle()), - Arc::clone(&mempool), - Arc::clone(&storage), - Arc::clone(&trie_db), + // Init mempool and recover signed transactions with the current block number + let current_stxs = txs_wal.load_by_number(current_block.header.number + 1); + log::info!("Recover {} txs from wal", current_stxs.len()); + + let mempool = init_mempool( + &config.mempool, + ¤t_block.header, + &storage, + &trie_db, + &network_service.handle(), + ¤t_stxs, + ) + .await; + + // Get the validator list from current metadata for consensus initialization + let metadata_root = AxonExecutorReadOnlyAdapter::from_root( + current_state_root, + Arc::clone(&trie_db), + Arc::clone(&storage), + Proposal::new_without_state_root(¤t_block.header).into(), + )? + .get_metadata_root(); + let metadata = MetadataHandle::new(metadata_root) + .get_metadata_by_block_number(current_block.header.number)?; + let validators: Vec = metadata + .verifier_list + .iter() + .map(|v| Validator { + pub_key: v.pub_key.as_bytes(), + propose_weight: v.propose_weight, + vote_weight: v.vote_weight, + }) + .collect::>(); + + // Set args in mempool + mempool.set_args( + Context::new(), + current_block.header.state_root, + metadata.consensus_config.gas_limit, + metadata.consensus_config.max_tx_size, + ); + + // Init overlord consensus and synchronization + let lock = Arc::new(AsyncMutex::new(())); + let crypto = init_crypto(config.privkey.as_ref(), &metadata.verifier_list)?; + let consensus_adapter = OverlordConsensusAdapter::<_, _, _, _>::new( + Arc::new(network_service.handle()), + Arc::clone(&mempool), + Arc::clone(&storage), + Arc::clone(&trie_db), + Arc::clone(&crypto), + )?; + let consensus_adapter = Arc::new(consensus_adapter); + let status_agent = get_status_agent(&storage, ¤t_block, &metadata).await?; + + let overlord_consensus = { + let consensus_wal_path = config.data_path_for_consensus_wal(); + let node_info = Secp256k1PrivateKey::try_from(config.privkey.as_ref()) + .map(|privkey| NodeInfo::new(current_block.header.chain_id, privkey.pub_key())) + .map_err(MainError::Crypto)?; + let overlord_consensus = OverlordConsensus::new( + status_agent.clone(), + node_info, Arc::clone(&crypto), - )?; - let consensus_adapter = Arc::new(consensus_adapter); - let status_agent = self - .get_status_agent(&storage, ¤t_block, &metadata) - .await; + Arc::clone(&txs_wal), + Arc::clone(&consensus_adapter), + Arc::clone(&lock), + Arc::new(ConsensusWal::new(consensus_wal_path)), + ) + .await; + Arc::new(overlord_consensus) + }; + + consensus_adapter.set_overlord_handler(overlord_consensus.get_overlord_handler()); + + let synchronization = Arc::new(OverlordSynchronization::<_>::new( + config.sync.sync_txs_chunk_size, + consensus_adapter, + status_agent.clone(), + lock, + )); + + network_service.tag_consensus(&metadata.verifier_list)?; + + // register endpoints to network service + network_service.register_mempool_endpoint(&mempool)?; + network_service.register_consensus_endpoint(&overlord_consensus)?; + network_service.register_synchronization_endpoint(&synchronization)?; + network_service.register_storage_endpoint(&storage)?; + network_service.register_rpc()?; + + let network_handle = network_service.handle(); + + // Run network service at the end of its life cycle + tokio::spawn(network_service.run()); + + // Run API + let api_adapter = Arc::new(DefaultAPIAdapter::new( + Arc::clone(&mempool), + Arc::clone(&storage), + Arc::clone(&trie_db), + Arc::new(network_handle), + )); + let _handles = run_jsonrpc_server(version, config, api_adapter).await?; + + // Run sync + tokio::spawn(async move { + if let Err(e) = synchronization.polling_broadcast().await { + log::error!("synchronization: {:?}", e); + } + }); - let overlord_consensus = self - .init_overlord_consensus(&status_agent, &txs_wal, &crypto, &lock, &consensus_adapter) - .await; + // Run consensus + run_overlord_consensus(metadata, validators, current_block, overlord_consensus); - consensus_adapter.set_overlord_handler(overlord_consensus.get_overlord_handler()); + components::system::set_ctrl_c_handle().await; + components::profiling::stop(); - let synchronization = Arc::new(OverlordSynchronization::<_>::new( - self.config.sync.sync_txs_chunk_size, - consensus_adapter, - status_agent.clone(), - lock, - )); + Ok(()) +} - network_service.tag_consensus(&metadata.verifier_list)?; +fn init_network_service( + config: &Config, + chain_id: u64, + key_provider: Option, +) -> ProtocolResult>> { + let network_config = NetworkConfig::from_config(config, chain_id)?; - // register endpoints to network service - network_service.register_mempool_endpoint(&mempool)?; - network_service.register_consensus_endpoint(&overlord_consensus)?; - network_service.register_synchronization_endpoint(&synchronization)?; - network_service.register_storage_endpoint(&storage)?; - network_service.register_rpc()?; + let key = key_provider + .map(KeyP::Custom) + .unwrap_or(KeyP::Default(network_config.secio_keypair.clone())); - let network_handle = network_service.handle(); + Ok(NetworkService::new(network_config, key)) +} - // Run network service at the end of its life cycle - tokio::spawn(network_service.run()); +async fn init_mempool( + config: &ConfigMempool, + current_header: &Header, + storage: &Arc, + trie_db: &Arc, + network_service: &N, + signed_txs: &[SignedTransaction], +) -> Arc>> +where + N: Rpc + PeerTrust + Gossip + Clone + Unpin + 'static, + S: Storage + 'static, + DB: TrieDB + Send + Sync + 'static, +{ + let mempool_adapter = DefaultMemPoolAdapter::::new( + network_service.clone(), + Arc::clone(storage), + Arc::clone(trie_db), + current_header.chain_id, + current_header.gas_limit.as_u64(), + config.pool_size as usize, + config.broadcast_txs_size, + config.broadcast_txs_interval, + ); + let mempool = Arc::new( + MemPoolImpl::new( + config.pool_size as usize, + config.timeout_gap, + mempool_adapter, + signed_txs.to_owned(), + ) + .await, + ); + + // Clone the mempool and spawn a thread to monitor the mempool length. + let monitor_mempool = Arc::clone(&mempool); + tokio::spawn(async move { + let interval = Duration::from_millis(1000); + loop { + sleep(interval).await; + MEMPOOL_LEN_GAUGE.set(monitor_mempool.len() as i64); + MEMPOOL_CO_QUEUE_LEN.set(monitor_mempool.len() as i64); + } + }); - // Run API - let api_adapter = Arc::new(DefaultAPIAdapter::new( - Arc::clone(&mempool), - Arc::clone(&storage), - Arc::clone(&trie_db), - Arc::new(network_handle), - )); - let _handles = run_jsonrpc_server(self.version, self.config.clone(), api_adapter).await?; - - // Run sync - tokio::spawn(async move { - if let Err(e) = synchronization.polling_broadcast().await { - log::error!("synchronization: {:?}", e); - } - }); + mempool +} - // Run consensus - Self::run_overlord_consensus(metadata, validators, current_block, overlord_consensus); +fn init_crypto( + privkey: &[u8], + validators: &[ValidatorExtend], +) -> ProtocolResult> { + let bls_priv_key = BlsPrivateKey::try_from(privkey).map_err(MainError::Crypto)?; + + let mut bls_pub_keys = HashMap::new(); + for validator_extend in validators.iter() { + let address = validator_extend.pub_key.as_bytes(); + let hex_pubkey = validator_extend.bls_pub_key.as_bytes(); + let pub_key = BlsPublicKey::try_from(hex_pubkey.as_ref()).map_err(MainError::Crypto)?; + bls_pub_keys.insert(address, pub_key); + } - components::system::set_ctrl_c_handle().await; - components::profiling::stop(); + let crypto = OverlordCrypto::new(bls_priv_key, bls_pub_keys, String::new()); + Ok(Arc::new(crypto)) +} - Ok(()) - } +async fn get_status_agent( + storage: &Arc, + block: &Block, + metadata: &Metadata, +) -> ProtocolResult { + let header = &block.header; + let latest_proof = storage.get_latest_proof(Context::new()).await?; + let current_consensus_status = CurrentStatus { + prev_hash: block.hash(), + last_number: header.number, + max_tx_size: metadata.consensus_config.max_tx_size.into(), + tx_num_limit: metadata.consensus_config.tx_num_limit, + proof: latest_proof, + last_state_root: header.state_root, + }; + + CHAIN_ID.swap(Arc::new(header.chain_id)); + + let status_agent = StatusAgent::new(current_consensus_status); + Ok(status_agent) +} - fn init_network_service( - &self, - key_provider: Option, - ) -> NetworkService> { - let network_config = - NetworkConfig::from_config(&self.config, self.genesis.block.header.chain_id).unwrap(); +fn run_overlord_consensus( + metadata: Metadata, + validators: Vec, + current_block: Block, + overlord_consensus: Arc>>, +) where + M: MemPool, + N: Rpc + PeerTrust + Gossip + Network + 'static, + S: Storage, + DB: TrieDB + Send + Sync, +{ + let timer_config = DurationConfig { + propose_ratio: metadata.consensus_config.propose_ratio, + prevote_ratio: metadata.consensus_config.prevote_ratio, + precommit_ratio: metadata.consensus_config.precommit_ratio, + brake_ratio: metadata.consensus_config.brake_ratio, + }; + + tokio::spawn(async move { + if let Err(e) = overlord_consensus + .run( + current_block.header.number, + metadata.consensus_config.interval, + validators, + Some(timer_config), + ) + .await + { + log::error!("axon-consensus: {:?} error", e); + } + }); +} - let key = key_provider - .map(KeyP::Custom) - .unwrap_or(KeyP::Default(network_config.secio_keypair.clone())); +async fn execute_genesis( + mut partial_genesis: RichBlock, + spec: ChainSpec, + db_group: &DatabaseGroup, +) -> ProtocolResult { + let resp = execute_transactions(&partial_genesis, db_group, &spec.accounts)?; - NetworkService::new(network_config, key) - } + partial_genesis.block.header.state_root = resp.state_root; + partial_genesis.block.header.receipts_root = resp.receipt_root; - async fn init_mempool( - &self, - trie_db: &Arc, - network_service: &N, - storage: &Arc, - signed_txs: &[SignedTransaction], - ) -> Arc>> - where - N: Rpc + PeerTrust + Gossip + Clone + Unpin + 'static, - S: Storage + 'static, - DB: TrieDB + Send + Sync + 'static, - { - let mempool_adapter = DefaultMemPoolAdapter::::new( - network_service.clone(), - Arc::clone(storage), - Arc::clone(trie_db), - self.genesis.block.header.chain_id, - self.genesis.block.header.gas_limit.as_u64(), - self.config.mempool.pool_size as usize, - self.config.mempool.broadcast_txs_size, - self.config.mempool.broadcast_txs_interval, - ); - let mempool = Arc::new( - MemPoolImpl::new( - self.config.mempool.pool_size as usize, - self.config.mempool.timeout_gap, - mempool_adapter, - signed_txs.to_owned(), - ) - .await, - ); - - // Clone the mempool and spawn a thread to monitor the mempool length. - let monitor_mempool = Arc::clone(&mempool); - tokio::spawn(async move { - let interval = Duration::from_millis(1000); - loop { - sleep(interval).await; - MEMPOOL_LEN_GAUGE.set(monitor_mempool.len() as i64); - MEMPOOL_CO_QUEUE_LEN.set(monitor_mempool.len() as i64); - } - }); + log::info!("The genesis block is executed {:?}", partial_genesis.block); + log::info!("Response for genesis is {:?}", resp); - mempool - } + db_group + .storage() + .save_block(&partial_genesis, &resp) + .await?; - fn init_crypto(&self, validators: &[ValidatorExtend]) -> Arc { - // self private key - let bls_priv_key = BlsPrivateKey::try_from(self.config.privkey.as_ref()) - .map_err(MainError::Crypto) - .unwrap(); - - let mut bls_pub_keys = HashMap::new(); - for validator_extend in validators.iter() { - let address = validator_extend.pub_key.as_bytes(); - let hex_pubkey = hex_decode(&validator_extend.bls_pub_key.as_string_trim0x()).unwrap(); - let pub_key = BlsPublicKey::try_from(hex_pubkey.as_ref()) - .map_err(MainError::Crypto) - .unwrap(); - bls_pub_keys.insert(address, pub_key); - } + Ok(partial_genesis) +} - Arc::new(OverlordCrypto::new( - bls_priv_key, - bls_pub_keys, - String::new(), - )) - } +async fn execute_genesis_temporarily( + mut partial_genesis: RichBlock, + spec: ChainSpec, + config: &ConfigRocksDB, + triedb_cache_size: usize, +) -> ProtocolResult { + let tmp_dir = tempfile::tempdir().map_err(|err| { + let err_msg = format!("failed to create temporary directory since {err:?}"); + MainError::Other(err_msg) + })?; + let path_block = tmp_dir.path().join("block"); - async fn get_status_agent( - &self, - storage: &Arc, - block: &Block, - metadata: &Metadata, - ) -> StatusAgent { - let header = &block.header; - let latest_proof = storage.get_latest_proof(Context::new()).await.unwrap(); - let current_consensus_status = CurrentStatus { - prev_hash: block.hash(), - last_number: header.number, - max_tx_size: metadata.consensus_config.max_tx_size.into(), - tx_num_limit: metadata.consensus_config.tx_num_limit, - proof: latest_proof, - last_state_root: if header.number == 0 { - self.state_root - } else { - header.state_root - }, - }; + let db_group = DatabaseGroup::new(config, path_block, triedb_cache_size)?; - CHAIN_ID.swap(Arc::new(header.chain_id)); + let resp = execute_transactions(&partial_genesis, &db_group, &spec.accounts)?; - StatusAgent::new(current_consensus_status) - } + partial_genesis.block.header.state_root = resp.state_root; + partial_genesis.block.header.receipts_root = resp.receipt_root; - async fn init_overlord_consensus( - &self, - status_agent: &StatusAgent, - txs_wal: &Arc, - crypto: &Arc, - lock: &Arc>, - consensus_adapter: &Arc>, - ) -> Arc>> - where - M: MemPool + 'static, - N: Rpc + PeerTrust + Gossip + Network + 'static, - S: Storage + 'static, - DB: TrieDB + Send + Sync + 'static, - { - let consensus_wal_path = self - .config - .data_path_for_consensus_wal() - .to_str() - .unwrap() - .to_string(); - let consensus_wal = Arc::new(ConsensusWal::new(consensus_wal_path)); - - let my_privkey = Secp256k1PrivateKey::try_from(self.config.privkey.as_ref()) - .map_err(MainError::Crypto) - .unwrap(); - let node_info = NodeInfo::new(self.genesis.block.header.chain_id, my_privkey.pub_key()); - - Arc::new( - OverlordConsensus::new( - status_agent.clone(), - node_info, - Arc::clone(crypto), - Arc::clone(txs_wal), - Arc::clone(consensus_adapter), - Arc::clone(lock), - Arc::clone(&consensus_wal), - ) - .await, - ) - } + log::info!("The genesis block is executed {:?}", partial_genesis.block); + log::info!("Response for genesis is {:?}", resp); - fn run_overlord_consensus( - metadata: Metadata, - validators: Vec, - current_block: Block, - overlord_consensus: Arc>>, - ) where - M: MemPool, - N: Rpc + PeerTrust + Gossip + Network + 'static, - S: Storage, - DB: TrieDB + Send + Sync, - { - let timer_config = DurationConfig { - propose_ratio: metadata.consensus_config.propose_ratio, - prevote_ratio: metadata.consensus_config.prevote_ratio, - precommit_ratio: metadata.consensus_config.precommit_ratio, - brake_ratio: metadata.consensus_config.brake_ratio, - }; - - tokio::spawn(async move { - if let Err(e) = overlord_consensus - .run( - current_block.header.number, - metadata.consensus_config.interval, - validators, - Some(timer_config), - ) - .await - { - log::error!("axon-consensus: {:?} error", e); - } - }); - } + Ok(partial_genesis) } -fn execute_transactions( +fn execute_transactions( rich: &RichBlock, - storage: &Arc, - trie_db: &Arc, - inner_db: &Arc, + db_group: &DatabaseGroup, accounts: &[InitialAccount], -) -> ProtocolResult -where - S: Storage + 'static, -{ - let state_root = MPTTrie::new(Arc::clone(&trie_db)) - .insert_accounts(accounts)? +) -> ProtocolResult { + let state_root = MPTTrie::new(db_group.trie_db()) + .insert_accounts(accounts) + .expect("insert accounts") .commit()?; let executor = AxonExecutor::default(); let mut backend = AxonExecutorApplyAdapter::from_root( state_root, - Arc::clone(trie_db), - Arc::clone(storage), + db_group.trie_db(), + db_group.storage(), Proposal::new_without_state_root(&rich.block.header).into(), )?; - system_contract::init(Arc::clone(inner_db), &mut backend); + system_contract::init(db_group.inner_db(), &mut backend); let resp = executor.exec(&mut backend, &rich.txs, &[]); diff --git a/core/run/src/tests.rs b/core/run/src/tests.rs index 7ea53ca32..705ca33e3 100644 --- a/core/run/src/tests.rs +++ b/core/run/src/tests.rs @@ -18,7 +18,7 @@ use protocol::{ types::{RichBlock, H256}, }; -use crate::{execute_transactions, init_storage}; +use crate::{execute_transactions, DatabaseGroup}; const DEV_CONFIG_DIR: &str = "../../devtools/chain"; @@ -129,21 +129,15 @@ async fn check_genesis_data<'a>(case: &TestCase<'a>) { ); } let path_block = tmp_dir.path().join("block"); - let (storage, trie_db, inner_db) = init_storage( + let db_group = DatabaseGroup::new( &config.rocksdb, path_block, config.executor.triedb_cache_size, ) - .expect("initialize storage"); - - let resp = execute_transactions( - &genesis, - &storage, - &trie_db, - &inner_db, - &chain_spec.accounts, - ) - .expect("execute transactions"); + .expect("initialize databases"); + + let resp = execute_transactions(&genesis, &db_group, &chain_spec.accounts) + .expect("execute transactions"); check_hashes( case.chain_name,