Skip to content

Commit

Permalink
fix_get_right_most_child
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Oct 8, 2024
1 parent 2deaf9f commit 024b426
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 47 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion storage/aptosdb/src/db/test_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub(crate) fn update_store(
store: &StateStore,
input: impl Iterator<Item = (StateKey, Option<StateValue>)>,
first_version: Version,
enable_sharding: bool,
) -> HashValue {
use aptos_storage_interface::{jmt_update_refs, jmt_updates};
let mut root_hash = *aptos_crypto::hash::SPARSE_MERKLE_PLACEHOLDER_HASH;
Expand Down Expand Up @@ -94,7 +95,7 @@ pub(crate) fn update_store(
None,
&ledger_batch,
&sharded_state_kv_batches,
/*put_state_value_indices=*/ false,
/*put_state_value_indices=*/ enable_sharding,
/*skip_usage=*/ false,
/*last_checkpoint_index=*/ None,
)
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/pruner/state_merkle_pruner/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ fn verify_state_value_pruner(inputs: Vec<Vec<(StateKey, Option<StateValue>)>>) {
user_pruning_window_offset: 0,
});
for batch in inputs {
update_store(store, batch.clone().into_iter(), version);
update_store(store, batch.clone().into_iter(), version, false);
for (k, v) in batch.iter() {
if let Some((old_version, old_v_opt)) =
current_state_values.insert(k.clone(), (version, v.clone()))
Expand Down
40 changes: 7 additions & 33 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use aptos_config::config::{RocksdbConfig, RocksdbConfigs, StorageDirPaths};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_experimental_runtimes::thread_manager::{optimal_min_len, THREAD_MANAGER};
use aptos_jellyfish_merkle::{
node_type::{NodeKey, NodeType},
JellyfishMerkleTree, TreeReader, TreeUpdateBatch, TreeWriter,
node_type::NodeKey, JellyfishMerkleTree, TreeReader, TreeUpdateBatch, TreeWriter,
};
use aptos_logger::prelude::*;
use aptos_rocksdb_options::gen_rocksdb_options;
use aptos_schemadb::{SchemaBatch, DB};
#[cfg(test)]
use aptos_scratchpad::get_state_shard_id;
use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result};
use aptos_storage_interface::{db_ensure as ensure, db_other_bail as bail, AptosDbError, Result};
use aptos_types::{
nibble::{nibble_path::NibblePath, ROOT_NIBBLE_HEIGHT},
proof::{SparseMerkleProofExt, SparseMerkleRangeProof},
Expand Down Expand Up @@ -677,20 +676,6 @@ impl StateMerkleDb {
) -> Result<Option<(NodeKey, LeafNode)>> {
let mut ret = None;

if self.enable_sharding {
let mut iter = self.metadata_db().iter::<JellyfishMerkleNodeSchema>()?;
iter.seek(&(version, 0)).unwrap();
// early exit if no node is found for the target version
match iter.next().transpose()? {
Some((node_key, node)) => {
if node.node_type() == NodeType::Null || node_key.version() != version {
return Ok(None);
}
},
None => return Ok(None),
};
}

// traverse all shards in a naive way
let shards = 0..self.hack_num_real_shards();
let start_num_of_nibbles = if self.enable_sharding { 1 } else { 0 };
Expand Down Expand Up @@ -760,7 +745,11 @@ impl StateMerkleDb {

if let Some((node_key, node)) = iter.next().transpose()? {
if node_key.version() != version {
continue;
bail!(
"node key version {} != version {}",
node_key.version(),
version
);
}
if let Node::Leaf(leaf_node) = node {
match ret {
Expand Down Expand Up @@ -822,21 +811,6 @@ impl TreeReader<StateKey> for StateMerkleDb {
}

fn get_rightmost_leaf(&self, version: Version) -> Result<Option<(NodeKey, LeafNode)>> {
// Since everything has the same version during restore, we seek to the first node and get
// its version.

let mut iter = self.metadata_db().iter::<JellyfishMerkleNodeSchema>()?;
// get the root node corresponding to the version
iter.seek(&(version, 0))?;
match iter.next().transpose()? {
Some((node_key, node)) => {
if node.node_type() == NodeType::Null || node_key.version() != version {
return Ok(None);
}
},
None => return Ok(None),
};

let ret = None;
let shards = 0..self.hack_num_real_shards();

Expand Down
69 changes: 61 additions & 8 deletions storage/aptosdb/src/state_store/state_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,55 @@ proptest! {
);
}

#[test]
fn test_get_rightmost_leaf_with_sharding(
(input, batch1_size) in hash_map(any::<StateKey>(), any::<StateValue>(), 2..1000)
.prop_flat_map(|input| {
let len = input.len();
(Just(input), 1..len)
})
) {
let tmp_dir1 = TempPath::new();
let db1 = AptosDB::new_for_test_with_sharding(&tmp_dir1, 1000);
let store1 = &db1.state_store;
init_sharded_store(store1, input.clone().into_iter());

let version = (input.len() - 1) as Version;
let expected_root_hash = store1.get_root_hash(version).unwrap();

let tmp_dir2 = TempPath::new();
let db2 = AptosDB::new_for_test_with_sharding(&tmp_dir2, 1000);


let store2 = &db2.state_store;
let mut restore =
StateSnapshotRestore::new(&store2.state_merkle_db, store2, version, expected_root_hash, true, /* async_commit */ StateSnapshotRestoreMode::Default).unwrap();

let mut ordered_input: Vec<_> = input
.into_iter()
.collect();
ordered_input.sort_unstable_by_key(|(key, _value)| key.hash());

let batch1: Vec<_> = ordered_input
.into_iter()
.take(batch1_size)
.collect();
let rightmost_of_batch1 = batch1.last().map(|(key, _value)| key.hash()).unwrap();
let proof_of_batch1 = store1
.get_value_range_proof(rightmost_of_batch1, version)
.unwrap();

restore.add_chunk(batch1, proof_of_batch1).unwrap();
restore.wait_for_async_commit().unwrap();

let expected = store2.state_merkle_db.get_rightmost_leaf_naive(version).unwrap();
// When re-initializing the store, the rightmost leaf should exist indicating the progress
let actual = store2.state_merkle_db.get_rightmost_leaf(version).unwrap();
// ensure the rightmost leaf is not None
prop_assert!(actual.is_some());
prop_assert_eq!(actual, expected);
}

#[test]
fn test_get_rightmost_leaf(
(input, batch1_size) in hash_map(any::<StateKey>(), any::<StateValue>(), 2..1000)
Expand All @@ -484,15 +533,9 @@ proptest! {
let tmp_dir2 = TempPath::new();
let db2 = AptosDB::new_for_test(&tmp_dir2);
let store2 = &db2.state_store;
let max_hash = HashValue::new([0xff; HashValue::LENGTH]);
let mut restore =
StateSnapshotRestore::new(&store2.state_merkle_db, store2, version, expected_root_hash, true, /* async_commit */ StateSnapshotRestoreMode::Default).unwrap();

let dummy_state_key = StateKey::raw(&[]);
let (top_levels_batch, sharded_batches, _) = store2.state_merkle_db.merklize_value_set(vec![(max_hash, Some(&(HashValue::random(), dummy_state_key)))], 0, None, None).unwrap();
store2.state_merkle_db.commit(version, top_levels_batch, sharded_batches).unwrap();
assert!(store2.state_merkle_db.get_rightmost_leaf(version).unwrap().is_none());

let mut ordered_input: Vec<_> = input
.into_iter()
.collect();
Expand All @@ -512,6 +555,7 @@ proptest! {

let expected = store2.state_merkle_db.get_rightmost_leaf_naive(version).unwrap();
let actual = store2.state_merkle_db.get_rightmost_leaf(version).unwrap();

prop_assert_eq!(actual, expected);
}

Expand All @@ -526,7 +570,7 @@ proptest! {
let mut version = 0;
for batch in input {
let next_version = version + batch.len() as Version;
let root_hash = update_store(store, batch.into_iter(), version);
let root_hash = update_store(store, batch.into_iter(), version, false);

let last_version = next_version - 1;
let snapshot = db
Expand Down Expand Up @@ -574,5 +618,14 @@ proptest! {

// Initializes the state store by inserting one key at each version.
fn init_store(store: &StateStore, input: impl Iterator<Item = (StateKey, StateValue)>) {
update_store(store, input.into_iter().map(|(k, v)| (k, Some(v))), 0);
update_store(
store,
input.into_iter().map(|(k, v)| (k, Some(v))),
0,
false,
);
}

fn init_sharded_store(store: &StateStore, input: impl Iterator<Item = (StateKey, StateValue)>) {
update_store(store, input.into_iter().map(|(k, v)| (k, Some(v))), 0, true);
}

0 comments on commit 024b426

Please sign in to comment.