From d1230b991dd8cf56e2adcd3fded2602e349acf75 Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Wed, 4 Dec 2024 23:37:36 -0800 Subject: [PATCH] [Storage] Open db in parallel when storage sharding is enabled. (#15504) --- storage/aptosdb/src/db_debugger/validation.rs | 2 +- storage/aptosdb/src/db_options.rs | 9 +- storage/aptosdb/src/ledger_db/mod.rs | 134 +++++++++++------- storage/aptosdb/src/state_kv_db.rs | 45 +++--- storage/aptosdb/src/state_merkle_db.rs | 25 +++- 5 files changed, 127 insertions(+), 88 deletions(-) diff --git a/storage/aptosdb/src/db_debugger/validation.rs b/storage/aptosdb/src/db_debugger/validation.rs index c06a61d27b56e..85c2022b8f50c 100644 --- a/storage/aptosdb/src/db_debugger/validation.rs +++ b/storage/aptosdb/src/db_debugger/validation.rs @@ -118,7 +118,7 @@ pub fn verify_state_kvs( ) -> Result<()> { println!("Validating db statekeys"); let storage_dir = StorageDirPaths::from_path(db_root_path); - let state_kv_db = StateKvDb::open(&storage_dir, RocksdbConfig::default(), false, true)?; + let state_kv_db = StateKvDb::open_sharded(&storage_dir, RocksdbConfig::default(), false)?; //read all statekeys from internal db and store them in mem let mut all_internal_keys = HashSet::new(); diff --git a/storage/aptosdb/src/db_options.rs b/storage/aptosdb/src/db_options.rs index e41de403b30b3..d703375e81521 100644 --- a/storage/aptosdb/src/db_options.rs +++ b/storage/aptosdb/src/db_options.rs @@ -221,15 +221,10 @@ pub(super) fn gen_state_merkle_cfds(rocksdb_config: &RocksdbConfig) -> Vec Vec { - let cfs = if enable_sharding { - state_kv_db_new_key_column_families() - } else { - state_kv_db_column_families() - }; + let cfs = state_kv_db_new_key_column_families(); gen_cfds(rocksdb_config, cfs, with_state_key_extractor_processor) } diff --git a/storage/aptosdb/src/ledger_db/mod.rs b/storage/aptosdb/src/ledger_db/mod.rs index da5d8377fbb26..96d886360bd3b 100644 --- a/storage/aptosdb/src/ledger_db/mod.rs +++ b/storage/aptosdb/src/ledger_db/mod.rs @@ -24,6 +24,7 @@ use crate::{ schema::db_metadata::{DbMetadataKey, DbMetadataSchema}, }; use aptos_config::config::{RocksdbConfig, RocksdbConfigs}; +use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_logger::prelude::info; use aptos_rocksdb_options::gen_rocksdb_options; use aptos_schemadb::{ColumnFamilyDescriptor, ColumnFamilyName, SchemaBatch, DB}; @@ -155,60 +156,95 @@ impl LedgerDb { let ledger_db_folder = db_root_path.as_ref().join(LEDGER_DB_FOLDER_NAME); - let event_db_raw = Arc::new(Self::open_rocksdb( - ledger_db_folder.join(EVENT_DB_NAME), - EVENT_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?); - let event_db = EventDb::new(event_db_raw.clone(), EventStore::new(event_db_raw)); - - let transaction_accumulator_db = - TransactionAccumulatorDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME), - TRANSACTION_ACCUMULATOR_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - - let transaction_auxiliary_data_db = - TransactionAuxiliaryDataDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME), - TRANSACTION_AUXILIARY_DATA_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - let transaction_db = TransactionDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_DB_NAME), - TRANSACTION_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - - let transaction_info_db = TransactionInfoDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(TRANSACTION_INFO_DB_NAME), - TRANSACTION_INFO_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); - - let write_set_db = WriteSetDb::new(Arc::new(Self::open_rocksdb( - ledger_db_folder.join(WRITE_SET_DB_NAME), - WRITE_SET_DB_NAME, - &rocksdb_configs.ledger_db_config, - readonly, - )?)); + let mut event_db = None; + let mut transaction_accumulator_db = None; + let mut transaction_auxiliary_data_db = None; + let mut transaction_db = None; + let mut transaction_info_db = None; + let mut write_set_db = None; + THREAD_MANAGER.get_non_exe_cpu_pool().scope(|s| { + s.spawn(|_| { + let event_db_raw = Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(EVENT_DB_NAME), + EVENT_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ); + event_db = Some(EventDb::new( + event_db_raw.clone(), + EventStore::new(event_db_raw), + )); + }); + s.spawn(|_| { + transaction_accumulator_db = Some(TransactionAccumulatorDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_ACCUMULATOR_DB_NAME), + TRANSACTION_ACCUMULATOR_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + s.spawn(|_| { + transaction_auxiliary_data_db = Some(TransactionAuxiliaryDataDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_AUXILIARY_DATA_DB_NAME), + TRANSACTION_AUXILIARY_DATA_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))) + }); + s.spawn(|_| { + transaction_db = Some(TransactionDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_DB_NAME), + TRANSACTION_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + s.spawn(|_| { + transaction_info_db = Some(TransactionInfoDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(TRANSACTION_INFO_DB_NAME), + TRANSACTION_INFO_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + s.spawn(|_| { + write_set_db = Some(WriteSetDb::new(Arc::new( + Self::open_rocksdb( + ledger_db_folder.join(WRITE_SET_DB_NAME), + WRITE_SET_DB_NAME, + &rocksdb_configs.ledger_db_config, + readonly, + ) + .unwrap(), + ))); + }); + }); // TODO(grao): Handle data inconsistency. Ok(Self { ledger_metadata_db: LedgerMetadataDb::new(ledger_metadata_db), - event_db, - transaction_accumulator_db, - transaction_auxiliary_data_db, - transaction_db, - transaction_info_db, - write_set_db, + event_db: event_db.unwrap(), + transaction_accumulator_db: transaction_accumulator_db.unwrap(), + transaction_auxiliary_data_db: transaction_auxiliary_data_db.unwrap(), + transaction_db: transaction_db.unwrap(), + transaction_info_db: transaction_info_db.unwrap(), + write_set_db: write_set_db.unwrap(), enable_storage_sharding: true, }) } diff --git a/storage/aptosdb/src/state_kv_db.rs b/storage/aptosdb/src/state_kv_db.rs index 35c1d01e3bbda..331b170084b5d 100644 --- a/storage/aptosdb/src/state_kv_db.rs +++ b/storage/aptosdb/src/state_kv_db.rs @@ -4,7 +4,7 @@ #![forbid(unsafe_code)] use crate::{ - db_options::gen_state_kv_cfds, + db_options::gen_state_kv_shard_cfds, metrics::OTHER_TIMERS_SECONDS, schema::{ db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, @@ -25,6 +25,7 @@ use aptos_types::{ transaction::Version, }; use arr_macro::arr; +use rayon::prelude::*; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -56,19 +57,13 @@ impl StateKvDb { }); } - Self::open( - db_paths, - rocksdb_configs.state_kv_db_config, - readonly, - sharding, - ) + Self::open_sharded(db_paths, rocksdb_configs.state_kv_db_config, readonly) } - pub(crate) fn open( + pub(crate) fn open_sharded( db_paths: &StorageDirPaths, state_kv_db_config: RocksdbConfig, readonly: bool, - enable_sharding: bool, ) -> Result { let state_kv_metadata_db_path = Self::metadata_db_path(db_paths.state_kv_db_metadata_root_path()); @@ -78,7 +73,6 @@ impl StateKvDb { STATE_KV_METADATA_DB_NAME, &state_kv_db_config, readonly, - enable_sharding, )?); info!( @@ -86,15 +80,22 @@ impl StateKvDb { "Opened state kv metadata db!" ); - let mut shard_id: usize = 0; - let state_kv_db_shards = { - arr![{ + let state_kv_db_shards = (0..NUM_STATE_SHARDS) + .into_par_iter() + .map(|shard_id| { let shard_root_path = db_paths.state_kv_db_shard_root_path(shard_id as u8); - let db = Self::open_shard(shard_root_path, shard_id as u8, &state_kv_db_config, readonly, enable_sharding)?; - shard_id += 1; + let db = Self::open_shard( + shard_root_path, + shard_id as u8, + &state_kv_db_config, + readonly, + ) + .unwrap_or_else(|e| panic!("Failed to open state kv db shard {shard_id}: {e:?}.")); Arc::new(db) - }; 16] - }; + }) + .collect::>() + .try_into() + .unwrap(); let state_kv_db = Self { state_kv_metadata_db, @@ -171,11 +172,10 @@ impl StateKvDb { cp_root_path: impl AsRef, ) -> Result<()> { // TODO(grao): Support path override here. - let state_kv_db = Self::open( + let state_kv_db = Self::open_sharded( &StorageDirPaths::from_path(db_root_path), RocksdbConfig::default(), false, - true, )?; let cp_state_kv_db_path = cp_root_path.as_ref().join(STATE_KV_DB_FOLDER_NAME); @@ -247,7 +247,6 @@ impl StateKvDb { shard_id: u8, state_kv_db_config: &RocksdbConfig, readonly: bool, - enable_sharding: bool, ) -> Result { let db_name = format!("state_kv_db_shard_{}", shard_id); Self::open_db( @@ -255,7 +254,6 @@ impl StateKvDb { &db_name, state_kv_db_config, readonly, - enable_sharding, ) } @@ -264,21 +262,20 @@ impl StateKvDb { name: &str, state_kv_db_config: &RocksdbConfig, readonly: bool, - enable_sharding: bool, ) -> Result { Ok(if readonly { DB::open_cf_readonly( &gen_rocksdb_options(state_kv_db_config, true), path, name, - gen_state_kv_cfds(state_kv_db_config, enable_sharding), + gen_state_kv_shard_cfds(state_kv_db_config), )? } else { DB::open_cf( &gen_rocksdb_options(state_kv_db_config, false), path, name, - gen_state_kv_cfds(state_kv_db_config, enable_sharding), + gen_state_kv_shard_cfds(state_kv_db_config), )? }) } diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index 7d7ad649d0431..10d75f5b164f5 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -583,13 +583,24 @@ impl StateMerkleDb { "Opened state merkle metadata db!" ); - let mut shard_id: usize = 0; - let state_merkle_db_shards = arr![{ - let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8); - let db = Self::open_shard(shard_root_path, shard_id as u8, &state_merkle_db_config, readonly)?; - shard_id += 1; - Arc::new(db) - }; 16]; + let state_merkle_db_shards = (0..NUM_STATE_SHARDS) + .into_par_iter() + .map(|shard_id| { + let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id as u8); + let db = Self::open_shard( + shard_root_path, + shard_id as u8, + &state_merkle_db_config, + readonly, + ) + .unwrap_or_else(|e| { + panic!("Failed to open state merkle db shard {shard_id}: {e:?}.") + }); + Arc::new(db) + }) + .collect::>() + .try_into() + .unwrap(); let state_merkle_db = Self { state_merkle_metadata_db,