Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Dec 11, 2024
1 parent bc4364b commit 0e4c1b7
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 41 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use aptos_storage_interface::{
db_ensure as ensure, state_store::state_delta::StateDelta, AptosDbError, Result,
};
use aptos_types::{
account_config::new_block_event_key,
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
proof::{
Expand Down Expand Up @@ -228,6 +229,22 @@ pub(crate) fn save_transactions_impl(
events,
&ledger_db_batch.event_db_batches,
)?;

if ledger_db.enable_storage_sharding() {
for (idx, txn_events) in events.iter().enumerate() {
for event in txn_events {
if let Some(event_key) = event.event_key() {
if *event_key == new_block_event_key() {
LedgerMetadataDb::put_block_info(
first_version + idx as Version,
event,
&ledger_db_batch.ledger_metadata_db_batches,
)?;
}
}
}
}
}
// insert changes in write set schema batch
for (idx, ws) in write_sets.iter().enumerate() {
WriteSetDb::put_write_set(
Expand Down
14 changes: 14 additions & 0 deletions storage/aptosdb/src/ledger_db/ledger_metadata_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ impl LedgerMetadataDb {
/// Returns the corresponding block height for a given version.
pub(crate) fn get_block_height_by_version(&self, version: Version) -> Result<u64> {
let mut iter = self.db.iter::<BlockByVersionSchema>()?;
// print some samples of
iter.seek_to_first();
let mut i = 0;
while i < 10 {
let (block_version, block_height) = iter
.next()
.transpose()?
.ok_or_else(|| anyhow!("Block is not found at version {version}, maybe pruned?"))?;
println!(
"block_version: {}, block_height: {}",
block_version, block_height
);
i += 1;
}

iter.seek_for_prev(&version)?;
let (_, block_height) = iter
Expand Down
2 changes: 2 additions & 0 deletions storage/db-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ aptos-backup-cli = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-config = { workspace = true }
aptos-db = { workspace = true, features = ["db-debugger"] }
aptos-db-indexer = { workspace = true }
aptos-executor = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-logger = { workspace = true }
Expand All @@ -35,3 +36,4 @@ tokio = { workspace = true }
aptos-backup-cli = { workspace = true, features = ["testing"] }
aptos-backup-service = { workspace = true }
aptos-executor-test-helpers = { workspace = true }
aptos-indexer-grpc-table-info = { workspace = true }
143 changes: 103 additions & 40 deletions storage/db-tool/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ mod dbtool_tests {
storage::{local_fs::LocalFs, BackupStorage},
utils::test_utils::start_local_backup_service,
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
use aptos_db::AptosDB;
use aptos_executor_test_helpers::integration_test_impl::{
test_execution_with_storage_impl, test_execution_with_storage_impl_inner,
Expand Down Expand Up @@ -288,10 +287,15 @@ mod dbtool_tests {
new_db_dir: PathBuf,
force_sharding: bool,
) -> (Runtime, String) {
use aptos_config::config::{
RocksdbConfigs, StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
NO_OP_STORAGE_PRUNER_CONFIG,
};
use aptos_db::utils::iterators::PrefixedStateValueIterator;
use aptos_db_indexer::utils::PrefixedStateValueIterator as IndexerPrefixedStateValueIterator;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use itertools::zip_eq;

let db = test_execution_with_storage_impl_inner(force_sharding, old_db_dir.as_path());
let db = test_execution_with_storage_impl_inner(false, old_db_dir.as_path());
let (rt, port) = start_local_backup_service(Arc::clone(&db));
let server_addr = format!(" http://localhost:{}", port);
// Backup the local_test DB
Expand Down Expand Up @@ -443,7 +447,7 @@ mod dbtool_tests {
backup_dir.as_path().to_str().unwrap().to_string(),
];
if force_sharding {
let additional_args = vec!["--enable-storage-sharding"]
let additional_args = vec!["--enable-storage-sharding", "--enable-state-indices"]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
Expand All @@ -461,49 +465,110 @@ mod dbtool_tests {
..Default::default()
}
};
let (_ledger_db, tree_db, state_kv_db) =
AptosDB::open_dbs(&StorageDirPaths::from_path(new_db_dir), db_config, false, 0)
.unwrap();

// assert the kv are the same in db and new_db
// current all the kv are still stored in the ledger db
//
for ver in start..=end {
let new_iter = PrefixedStateValueIterator::new(
&state_kv_db,
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();
let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),

if !force_sharding {
let (_ledger_db, tree_db, state_kv_db) =
AptosDB::open_dbs(&StorageDirPaths::from_path(new_db_dir), db_config, false, 0)
.unwrap();
for ver in start..=end {
let new_iter = PrefixedStateValueIterator::new(
&state_kv_db,
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

zip_eq(new_iter, old_iter).for_each(|(new, old)| {
let (new_key, new_value) = new.unwrap();
let (old_key, old_value) = old.unwrap();
assert_eq!(new_key, old_key);
assert_eq!(new_value, old_value);
});
let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

zip_eq(new_iter, old_iter).for_each(|(new, old)| {
let (new_key, new_value) = new.unwrap();
let (old_key, old_value) = old.unwrap();
assert_eq!(new_key, old_key);
assert_eq!(new_value, old_value);
});
// first snapshot tree not recovered
assert!(
tree_db.get_root_hash(0).is_err() || tree_db.get_leaf_count(0).unwrap() == 0,
"tree at version 0 should not be restored"
);
// second snapshot tree recovered
let second_snapshot_version: Version = 13;
assert!(
tree_db.get_root_hash(second_snapshot_version).is_ok(),
"root hash at version {} doesn't exist",
second_snapshot_version,
);
}
} else {
let internal_indexer_db =
InternalIndexerDBService::get_indexer_db_for_restore(new_db_dir.as_path()).unwrap();

let aptos_db: Arc<dyn DbReader> = Arc::new(
AptosDB::open(
StorageDirPaths::from_path(new_db_dir),
false,
NO_OP_STORAGE_PRUNER_CONFIG,
db_config,
false,
BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
1000,
Some(internal_indexer_db.clone()),
)
.unwrap(),
);

for ver in start..=end {
let new_iter = IndexerPrefixedStateValueIterator::new(
aptos_db.clone(),
&internal_indexer_db.get_inner_db_ref(),
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
Version::MAX,
)
.unwrap();

let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

// collect all the keys in the new_iter
let mut new_keys = new_iter.map(|e| e.unwrap().0).collect::<Vec<_>>();
new_keys.sort();
let mut old_keys = old_iter.map(|e| e.unwrap().0).collect::<Vec<_>>();
old_keys.sort();
assert!(new_keys.len() > 0, "new_keys is empty");
assert_eq!(new_keys, old_keys);

let old_block_res = db.get_block_info_by_version(ver);
let new_block_res = aptos_db.get_block_info_by_version(ver);
assert!(old_block_res.is_ok());
assert!(new_block_res.is_ok());
let (old_block_version, old_block_height, _) = old_block_res.unwrap();
let (new_block_version, new_block_height, _) = new_block_res.unwrap();
assert_eq!(old_block_version, new_block_version);
assert_eq!(old_block_height, new_block_height);
}


}
// first snapshot tree not recovered
assert!(
tree_db.get_root_hash(0).is_err() || tree_db.get_leaf_count(0).unwrap() == 0,
"tree at version 0 should not be restored"
);
// second snapshot tree recovered
let second_snapshot_version: Version = 13;
assert!(
tree_db.get_root_hash(second_snapshot_version).is_ok(),
"root hash at version {} doesn't exist",
second_snapshot_version,
);

(rt, server_addr)
}
#[test]
Expand Down Expand Up @@ -589,16 +654,14 @@ mod dbtool_tests {
}

#[test]
#[ignore]
// TODO(grao): Re-enable this test.
fn test_restore_with_sharded_db() {
let backup_dir = TempPath::new();
backup_dir.create_as_dir().unwrap();
let new_db_dir = TempPath::new();
let old_db_dir = TempPath::new();

let (rt, _) = db_restore_test_setup(
16,
0,
16,
PathBuf::from(backup_dir.path()),
PathBuf::from(old_db_dir.path()),
Expand Down
2 changes: 1 addition & 1 deletion storage/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod db_ops;
pub mod db_v2;
pub mod indexer_reader;
mod metrics;
mod utils;
pub mod utils;

use crate::db::INDEX_DB_NAME;
use aptos_config::config::RocksdbConfig;
Expand Down

0 comments on commit 0e4c1b7

Please sign in to comment.