Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write block info when restoring sharded DB #15567

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use aptos_storage_interface::{
db_ensure as ensure, state_store::state_delta::StateDelta, AptosDbError, Result,
};
use aptos_types::{
account_config::new_block_event_key,
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
proof::{
Expand Down Expand Up @@ -229,6 +230,22 @@ pub(crate) fn save_transactions_impl(
events,
&ledger_db_batch.event_db_batches,
)?;

if ledger_db.enable_storage_sharding() {
for (idx, txn_events) in events.iter().enumerate() {
for event in txn_events {
if let Some(event_key) = event.event_key() {
if *event_key == new_block_event_key() {
LedgerMetadataDb::put_block_info(
first_version + idx as Version,
event,
&ledger_db_batch.ledger_metadata_db_batches,
)?;
}
}
}
}
}
// insert changes in write set schema batch
for (idx, ws) in write_sets.iter().enumerate() {
WriteSetDb::put_write_set(
Expand Down
2 changes: 2 additions & 0 deletions storage/db-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ aptos-backup-cli = { workspace = true }
aptos-block-executor = { workspace = true }
aptos-config = { workspace = true }
aptos-db = { workspace = true, features = ["db-debugger"] }
aptos-db-indexer = { workspace = true }
aptos-executor = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-logger = { workspace = true }
Expand All @@ -35,3 +36,4 @@ tokio = { workspace = true }
aptos-backup-cli = { workspace = true, features = ["testing"] }
aptos-backup-service = { workspace = true }
aptos-executor-test-helpers = { workspace = true }
aptos-indexer-grpc-table-info = { workspace = true }
134 changes: 101 additions & 33 deletions storage/db-tool/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ mod dbtool_tests {
storage::{local_fs::LocalFs, BackupStorage},
utils::test_utils::start_local_backup_service,
};
use aptos_config::config::{RocksdbConfigs, StorageDirPaths};
use aptos_db::AptosDB;
use aptos_executor_test_helpers::integration_test_impl::{
test_execution_with_storage_impl, test_execution_with_storage_impl_inner,
Expand Down Expand Up @@ -288,10 +287,15 @@ mod dbtool_tests {
new_db_dir: PathBuf,
force_sharding: bool,
) -> (Runtime, String) {
use aptos_config::config::{
RocksdbConfigs, StorageDirPaths, BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
NO_OP_STORAGE_PRUNER_CONFIG,
};
use aptos_db::utils::iterators::PrefixedStateValueIterator;
use aptos_db_indexer::utils::PrefixedStateValueIterator as IndexerPrefixedStateValueIterator;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use itertools::zip_eq;

let db = test_execution_with_storage_impl_inner(force_sharding, old_db_dir.as_path());
let db = test_execution_with_storage_impl_inner(false, old_db_dir.as_path());
let (rt, port) = start_local_backup_service(Arc::clone(&db));
let server_addr = format!(" http://localhost:{}", port);
// Backup the local_test DB
Expand Down Expand Up @@ -443,7 +447,7 @@ mod dbtool_tests {
backup_dir.as_path().to_str().unwrap().to_string(),
];
if force_sharding {
let additional_args = vec!["--enable-storage-sharding"]
let additional_args = vec!["--enable-storage-sharding", "--enable-state-indices"]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
Expand All @@ -461,49 +465,115 @@ mod dbtool_tests {
..Default::default()
}
};
let (_ledger_db, tree_db, state_kv_db) =
AptosDB::open_dbs(&StorageDirPaths::from_path(new_db_dir), db_config, false, 0)
.unwrap();

// assert the kv are the same in db and new_db
// current all the kv are still stored in the ledger db
//
for ver in start..=end {
let new_iter = PrefixedStateValueIterator::new(
&state_kv_db,

if !force_sharding {
let (_ledger_db, tree_db, state_kv_db) =
AptosDB::open_dbs(&StorageDirPaths::from_path(new_db_dir), db_config, false, 0)
.unwrap();
for ver in start..=end {
let new_iter = PrefixedStateValueIterator::new(
&state_kv_db,
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
)
.unwrap();

zip_eq(new_iter, old_iter).for_each(|(new, old)| {
let (new_key, new_value) = new.unwrap();
let (old_key, old_value) = old.unwrap();
assert_eq!(new_key, old_key);
assert_eq!(new_value, old_value);
});
// first snapshot tree not recovered
assert!(
tree_db.get_root_hash(0).is_err() || tree_db.get_leaf_count(0).unwrap() == 0,
"tree at version 0 should not be restored"
);
// second snapshot tree recovered
let second_snapshot_version: Version = 13;
assert!(
tree_db.get_root_hash(second_snapshot_version).is_ok(),
"root hash at version {} doesn't exist",
second_snapshot_version,
);
}
} else {
let internal_indexer_db =
InternalIndexerDBService::get_indexer_db_for_restore(new_db_dir.as_path()).unwrap();

let aptos_db: Arc<dyn DbReader> = Arc::new(
AptosDB::open(
StorageDirPaths::from_path(new_db_dir),
false,
NO_OP_STORAGE_PRUNER_CONFIG,
db_config,
false,
BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
1000,
Some(internal_indexer_db.clone()),
)
.unwrap(),
);

// Only state key at and by the snapshot version are restored in internal indexer
let snapshot_version = if start == 0 {
0
} else if start > 0 && start < 15 {
1
} else {
15
};

let new_iter = IndexerPrefixedStateValueIterator::new(
aptos_db.clone(),
internal_indexer_db.get_inner_db_ref(),
StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
snapshot_version,
)
.unwrap();

let old_iter = db
.deref()
.get_prefixed_state_value_iterator(
&StateKeyPrefix::new(AccessPath, b"".to_vec()),
None,
ver,
snapshot_version,
)
.unwrap();

zip_eq(new_iter, old_iter).for_each(|(new, old)| {
let (new_key, new_value) = new.unwrap();
let (old_key, old_value) = old.unwrap();
assert_eq!(new_key, old_key);
assert_eq!(new_value, old_value);
});
// collect all the keys in the new_iter
let mut new_keys = new_iter.map(|e| e.unwrap().0).collect::<Vec<_>>();
new_keys.sort();
let mut old_keys = old_iter.map(|e| e.unwrap().0).collect::<Vec<_>>();
old_keys.sort();
assert_eq!(new_keys, old_keys);

let ledger_version = aptos_db.get_latest_ledger_info_version().unwrap();
for ver in start..=ledger_version {
let old_block_res = db.get_block_info_by_version(ver);
let new_block_res = aptos_db.get_block_info_by_version(ver);
let (old_block_version, old_block_height, _) = old_block_res.unwrap();
let (new_block_version, new_block_height, _) = new_block_res.unwrap();
assert_eq!(old_block_version, new_block_version);
assert_eq!(old_block_height, new_block_height);
}
}
// first snapshot tree not recovered
assert!(
tree_db.get_root_hash(0).is_err() || tree_db.get_leaf_count(0).unwrap() == 0,
"tree at version 0 should not be restored"
);
// second snapshot tree recovered
let second_snapshot_version: Version = 13;
assert!(
tree_db.get_root_hash(second_snapshot_version).is_ok(),
"root hash at version {} doesn't exist",
second_snapshot_version,
);

(rt, server_addr)
}
#[test]
Expand Down Expand Up @@ -589,16 +659,14 @@ mod dbtool_tests {
}

#[test]
#[ignore]
// TODO(grao): Re-enable this test.
fn test_restore_with_sharded_db() {
let backup_dir = TempPath::new();
backup_dir.create_as_dir().unwrap();
let new_db_dir = TempPath::new();
let old_db_dir = TempPath::new();

let (rt, _) = db_restore_test_setup(
16,
0,
16,
PathBuf::from(backup_dir.path()),
PathBuf::from(old_db_dir.path()),
Expand Down
2 changes: 1 addition & 1 deletion storage/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod db_v2;
pub mod event_v2_translator;
pub mod indexer_reader;
mod metrics;
mod utils;
pub mod utils;

use crate::db::INDEX_DB_NAME;
use aptos_config::config::RocksdbConfig;
Expand Down
Loading