diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7c68dd1a34..41f190c0a0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -22,7 +22,7 @@ jobs: # we must duplicate the rustflags from `.cargo/config.toml`. RUSTFLAGS: "-D warnings --cfg tokio_unstable" - name: Run tests with nextest - run: cargo nextest run --release + run: cargo nextest run --release --features migration env: CARGO_TERM_COLOR: always diff --git a/crates/storage/src/snapshot.rs b/crates/storage/src/snapshot.rs index d49c3a1e6b..92756228b3 100644 --- a/crates/storage/src/snapshot.rs +++ b/crates/storage/src/snapshot.rs @@ -81,11 +81,16 @@ impl Snapshot { .await? } - pub fn substore_version( - &self, - prefix: &Arc, - ) -> Option { - self.0.multistore_cache.get_version(prefix) + pub fn prefix_version(&self, prefix: &str) -> Result> { + let config = self + .0 + .multistore_cache + .config + .find_substore(prefix.as_bytes()); + if prefix != config.prefix { + anyhow::bail!("requested substore (prefix={prefix}) does not exist") + } + Ok(self.substore_version(&config)) } /// Returns the root hash of the subtree corresponding to the given prefix. @@ -108,7 +113,7 @@ impl Snapshot { // However, we do not want to mislead the caller by returning a root hash // that does not correspond to the queried prefix, so we error out instead. if prefix != config.prefix { - anyhow::bail!("requested substore does not exist") + anyhow::bail!("requested substore (prefix={prefix}) does not exist") } let version = self @@ -137,6 +142,13 @@ impl Snapshot { pub async fn root_hash(&self) -> Result { self.prefix_root_hash("").await } + + pub(crate) fn substore_version( + &self, + prefix: &Arc, + ) -> Option { + self.0.multistore_cache.get_version(prefix) + } } #[async_trait] diff --git a/crates/storage/src/snapshot_cache.rs b/crates/storage/src/snapshot_cache.rs index 4509ac2677..0ded1607d7 100644 --- a/crates/storage/src/snapshot_cache.rs +++ b/crates/storage/src/snapshot_cache.rs @@ -78,6 +78,11 @@ impl SnapshotCache { .map(Clone::clone) .filter(|s| s.version() == version) } + + /// Empties the cache. + pub fn clear(&mut self) { + self.cache.clear(); + } } #[cfg(test)] diff --git a/crates/storage/src/storage.rs b/crates/storage/src/storage.rs index e50b76891e..7931613321 100644 --- a/crates/storage/src/storage.rs +++ b/crates/storage/src/storage.rs @@ -273,20 +273,23 @@ impl Storage { anyhow::bail!("version mismatch in commit: expected state forked from version {} but found state forked from version {}", old_version, snapshot.version()); } - self.commit_inner(snapshot, changes, new_version, true) + self.commit_inner(snapshot, changes, new_version, false) .await } - /// Commits the provided [`StateDelta`] to persistent storage as the latest - /// version of the chain state. If `write_to_snapshot_cache` is `false`, the - /// snapshot will not be written to the snapshot cache, and no subscribers - /// will be notified. + /// Commits the supplied [`Cache`] to persistent storage. + /// + /// # Migrations + /// In the case of chain state migrations we need to commit the new state + /// without incrementing the version. If `perform_migration` is `true` the + /// snapshot will _not_ be written to the snapshot cache, and no subscribers + /// will be notified. Substore versions will not be updated. async fn commit_inner( &self, snapshot: Snapshot, cache: Cache, version: jmt::Version, - write_to_snapshot_cache: bool, + perform_migration: bool, ) -> Result { tracing::debug!(new_jmt_version = ?version, "committing state delta"); let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config); @@ -303,23 +306,24 @@ impl Storage { // its own changes to the batch, and we will commit it at the end. let mut write_batch = rocksdb::WriteBatch::default(); - // Note(erwan): since we work over sharded keyspaces/disjoint column families, - // it is tempting to consider a rewrite of this loop into a [`tokio::task::JoinSet`], - // however there are some complications that should be on your radar: - // * overhead: at the time of writing, there is a single digit number of substores, - // so it's implausible that the overhead of a joinset would be worth it. - // * atomicity: unfortunately, `WriteBatch`es are not thread safe, this means that - // to spin-up N tasks, we would either need to: - // Option A: use a single batch, and synchronize access to it between tasks. - // if the number of substore contention grows with the number of substores, - // which is likely the case if you're considering a joinset. - // Option B: use N batches, and find a way to commit to them atomically. - // (better, but not supported) RocksDB does not allow merging batches - // together, and though [`rocksdb::OptimisticTransactionDB`] offers an - // ACID API, it is not compatible with the [`rocksdb::WriteBatch`] API. - // A last option is to relax atomicity constraints, so that each commit task can produce - // its own independent batch, and we can commit them all at once. This means that each batch - // write is atomic, but the overall commit is not. + // Note(erwan): Here, we iterate over each substore, and spawn a task to + // commit it. Since we know that the substore keyspace is disjoint, we + // could consider rewriting this loop into a [`tokio::task::JoinSet`], + // however consider that `rocksdb::WriteBatch` is _not_ thread-safe. + // + // This means that to spin-up N tasks, we would need to use a + // single batch wrapped in a mutex, or use N batches, and find + // a way to commit to them atomically. Since that is not supported + // by RocksDB, we would have to iterate over each entry in each + // batch, and merge them together. + // + // Another option is to trade atomicity for parallelism by producing + // N batches, and committing them in distinct atomic writes. This is + // dangerous because it could leave the node in an inconsistent state. + // + // Instead of doing that, we lean on the fact that the number of substores + // is small, and that the synchronization overhead of a joinset would exceed + // its benefits. for config in self.0.multistore_config.iter() { tracing::debug!(substore_prefix = ?config.prefix, "processing substore"); // If the substore is empty, we need to fetch its initialized version from the cache. @@ -338,7 +342,11 @@ impl Storage { continue; }; - let version = old_substore_version.wrapping_add(1); + let version = if perform_migration { + old_substore_version + } else { + old_substore_version.wrapping_add(1) + }; new_versions.push(version); let substore_snapshot = SubstoreSnapshot { config: config.clone(), @@ -416,7 +424,7 @@ impl Storage { multistore_versions.set_version(main_store_config, version); /* hydrate the snapshot cache */ - if !write_to_snapshot_cache { + if perform_migration { tracing::debug!("skipping snapshot cache update"); return Ok(global_root_hash); } @@ -444,11 +452,10 @@ impl Storage { #[cfg(feature = "migration")] /// Commits the provided [`StateDelta`] to persistent storage without increasing the version /// of the chain state. - /// TODO(erwan): with the addition of substores, we need to revisit this API. pub async fn commit_in_place(&self, delta: StateDelta) -> Result { let (snapshot, changes) = delta.flatten(); let old_version = self.latest_version(); - self.commit_inner(snapshot, changes, old_version, false) + self.commit_inner(snapshot, changes, old_version, true) .await } @@ -458,12 +465,12 @@ impl Storage { self.0.db.clone() } - #[cfg(test)] - /// Consumes the `Inner` storage and waits for all resources to be reclaimed. + /// Shuts down the database and the dispatcher task, and waits for all resources to be reclaimed. /// Panics if there are still outstanding references to the `Inner` storage. - pub(crate) async fn release(mut self) { + pub async fn release(mut self) { if let Some(inner) = Arc::get_mut(&mut self.0) { inner.shutdown().await; + inner.snapshots.write().clear(); // `Inner` is dropped once the call completes. } else { panic!("Unable to get mutable reference to Inner"); @@ -472,8 +479,7 @@ impl Storage { } impl Inner { - #[cfg(test)] - pub async fn shutdown(&mut self) { + pub(crate) async fn shutdown(&mut self) { if let Some(jh) = self.jh_dispatcher.take() { jh.abort(); let _ = jh.await; diff --git a/crates/storage/tests/migration.rs b/crates/storage/tests/migration.rs new file mode 100644 index 0000000000..86d097a654 --- /dev/null +++ b/crates/storage/tests/migration.rs @@ -0,0 +1,225 @@ +#![cfg(feature = "migration")] +use jmt::RootHash; +use penumbra_storage::StateDelta; +use penumbra_storage::StateRead; +use penumbra_storage::StateWrite; +use penumbra_storage::Storage; +use tempfile; +use tokio; + +/* + * Migration tests. + * + * Node operators perform network upgrades by running a migration of + * the chain state that preserve block height continuity. In order to + * enable this, we need to have a way to commit changes to our merkle + * tree, _without_ increasing its version number. + * + * With the addition of substores, we must cover the cases when migrations + * accesses data located in substores. + * + * These integration tests enforce that a migration operation is able to + * write to both the main store and any number of substores without incrementing + * their version number. + * + */ + +#[tokio::test] +/// Test that we can commit to the main store without incrementing its version. +async fn test_simple_migration() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + let substore_prefixes = vec![]; + let storage = Storage::load(db_path, substore_prefixes).await?; + + let mut counter = 0; + let num_writes = 10; + + for i in 0..num_writes { + let mut delta = StateDelta::new(storage.latest_snapshot()); + let key_1 = format!("key_{i}"); + let value_1 = format!("value_{i}").as_bytes().to_vec(); + delta.put_raw(key_1.clone(), value_1.clone()); + + let _ = storage.commit(delta).await?; + // Check that we can read the values back out. + let snapshot = storage.latest_snapshot(); + + let retrieved_value = snapshot.get_raw(key_1.as_str()).await?.unwrap(); + assert_eq!(retrieved_value, value_1); + counter += 1; + } + + let old_global_root = storage + .latest_snapshot() + .root_hash() + .await + .expect("infaillible"); + let old_version = storage.latest_version(); + assert_eq!(old_version, counter - 1); + + /* ********************* */ + /* perform the migration */ + /* ********************* */ + let mut delta = StateDelta::new(storage.latest_snapshot()); + let key_root_2 = "migration".to_string(); + let value_root_2 = "migration data".as_bytes().to_vec(); + delta.put_raw(key_root_2, value_root_2); + let new_global_root = storage.commit_in_place(delta).await?; + let new_version = storage.latest_version(); + + assert_ne!( + old_global_root, new_global_root, + "migration did not effect the root hash" + ); + + assert_eq!(old_version, new_version, "the version number has changed!"); + + assert_eq!(counter, num_writes); + + Ok(()) +} + +#[tokio::test] +/// Test that we can commit to substores without incrementing their version. +async fn test_substore_migration() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + let substore_prefixes = vec!["ibc/".to_string(), "dex/".to_string(), "misc/".to_string()]; + let all_substores = vec!["ibc/".to_string(), "dex/".to_string(), "".to_string()]; + let storage = Storage::load(db_path.clone(), substore_prefixes.clone()).await?; + + let mut counter = 0; + let num_writes = 10; + + // Write to every substore, multiple times and check that we can read the values back out. + for i in 0..num_writes { + let mut delta = StateDelta::new(storage.latest_snapshot()); + let mut keys: Vec = vec![]; + let mut values: Vec> = vec![]; + for substore in all_substores.iter() { + let key = format!("{substore}key_{i}"); + let value = format!("{substore}value_{i}").as_bytes().to_vec(); + tracing::debug!(?key, "initializing substore {substore} with key-value pair"); + delta.put_raw(key.clone(), value.clone()); + keys.push(key); + values.push(value); + } + + let _ = storage.commit(delta).await?; + let snapshot = storage.latest_snapshot(); + + for (key, value) in keys.iter().zip(values.iter()) { + let retrieved_value = snapshot.get_raw(key.as_str()).await?.unwrap(); + assert_eq!(retrieved_value, *value); + } + + counter += 1; + } + assert_eq!(counter, num_writes); + + let premigration_snapshot = storage.latest_snapshot(); + let mut old_root_hashes: Vec = vec![]; + for substore in all_substores.iter() { + let root_hash = premigration_snapshot + .prefix_root_hash(substore.as_str()) + .await + .expect("prefix exists"); + old_root_hashes.push(root_hash); + } + + let old_substore_versions: Vec = all_substores + .clone() + .into_iter() + .map(|prefix| { + let old_version = premigration_snapshot + .prefix_version(prefix.as_str()) + .expect("prefix exists"); + old_version.expect("substore is initialized") + }) + .collect(); + + let old_version = storage.latest_version(); + assert_eq!(old_version, counter - 1); // -1 because we start at u64::MAX + drop(premigration_snapshot); + + /* ******************************* */ + /* perform the migration */ + /* (write a key in every substore) */ + /* ******************************* */ + let mut delta = StateDelta::new(storage.latest_snapshot()); + + // Start by writing a key in every substore, including the main store. + for substore in all_substores.iter() { + let key = format!("{substore}migration", substore = substore); + let value = format!("{substore}migration data", substore = substore) + .as_bytes() + .to_vec(); + tracing::debug!(?key, "migration: writing to substore {substore}"); + delta.put_raw(key, value); + } + + // Commit the migration. + let _ = storage.commit_in_place(delta).await?; + + // Note(erwan): when we perform a commit in-place, we do not update the + // snapshot cache. This means that querying `Storage::latest_snapshot()` + // will return a now-stale view of the state. + storage.release().await; + let storage = Storage::load(db_path, substore_prefixes).await?; + + let postmigration_snapshot = storage.latest_snapshot(); + let new_version = storage.latest_version(); + + assert_eq!( + old_version, new_version, + "the global version number has changed!" + ); + + // Check that the root hash for every substore has changed. + let mut new_root_hashes: Vec = vec![]; + for substore in all_substores.iter() { + let root_hash = postmigration_snapshot + .prefix_root_hash(substore.as_str()) + .await + .expect("prefix exists"); + new_root_hashes.push(root_hash); + } + + old_root_hashes + .iter() + .zip(new_root_hashes.iter()) + .zip(all_substores.iter()) + .for_each(|((old, new), substore)| { + assert_ne!( + old, new, + "migration did not effect the root hash for substore {substore}", + ); + }); + + // Check that the version number for every substore has NOT changed. + let new_substore_versions: Vec = all_substores + .clone() + .into_iter() + .map(|prefix| { + let new_version = postmigration_snapshot + .prefix_version(prefix.as_str()) + .expect("prefix exists"); + new_version.expect("substore is initialized") + }) + .collect(); + + old_substore_versions + .iter() + .zip(new_substore_versions.iter()) + .zip(all_substores.iter()) + .for_each(|((old, new), substore)| { + assert_eq!( + old, new, + "the version number for substore {substore} has changed!", + ); + }); + Ok(()) +} diff --git a/crates/storage/tests/substore_tests.rs b/crates/storage/tests/substore_tests.rs index 552b1c861d..6cb01ec40f 100644 --- a/crates/storage/tests/substore_tests.rs +++ b/crates/storage/tests/substore_tests.rs @@ -311,7 +311,7 @@ async fn test_substore_prefix_keys() -> anyhow::Result<()> { let mut range = snapshot.prefix_raw(query_prefix); while let Some(res) = range.next().await { let (key, value) = res?; - tracing::debug!(?key, ?value, ?query_prefix, "iterating over key/value pair"); + tracing::debug!(?key, ?query_prefix, "iterating over key/value pair"); if counter >= kv_main.len() { tracing::debug!(?key, ?value, ?query_prefix, "unexpected key/value pair");