Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Introduce storage migrations via ImportStorage/ExportStorage tr…
Browse files Browse the repository at this point in the history
…aits, and other trait-based operations.
  • Loading branch information
jsantell committed Sep 21, 2023
1 parent 36cf25e commit c3d5144
Show file tree
Hide file tree
Showing 23 changed files with 866 additions and 184 deletions.
5 changes: 2 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
},
"rust-analyzer.cargo.features": [
"test-kubo",
"helpers",
"performance"
"helpers"
]
}
}
2 changes: 1 addition & 1 deletion rust/noosphere-common/src/unshared.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::ConditionalSend;
use futures_util::Stream;

/// NOTE: This type was adapted from https://github.com/Nullus157/async-compression/blob/main/src/unshared.rs
/// NOTE: This type was adapted from <https://github.com/Nullus157/async-compression/blob/main/src/unshared.rs>
/// Original implementation licensed MIT/Apache 2
///
/// Wraps a type and only allows unique borrowing, the main usecase is to wrap a `!Sync` type and
Expand Down
20 changes: 9 additions & 11 deletions rust/noosphere-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,28 @@ readme = "README.md"
anyhow = { workspace = true }
async-trait = "~0.1"
async-stream = { workspace = true }
tokio-stream = { workspace = true }
base64 = "=0.21.2"
cid = { workspace = true }
noosphere-common = { version = "0.1.0", path = "../noosphere-common" }
tracing = "~0.1"
ucan = { workspace = true }
instant = { version = "0.1.12", features = ["wasm-bindgen"] }
libipld-core = { workspace = true }
libipld-cbor = { workspace = true }
noosphere-common = { version = "0.1.0", path = "../noosphere-common" }
rand = { workspace = true }
serde = { workspace = true }
base64 = "=0.21.2"
tokio-stream = { workspace = true }
tracing = "~0.1"
ucan = { workspace = true }
url = { version = "^2" }
witty-phrase-generator = "~0.2"

[dev-dependencies]
witty-phrase-generator = "~0.2"
wasm-bindgen-test = { workspace = true }
rand = { workspace = true }
noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" }
noosphere-common = { path = "../noosphere-common", features = ["helpers"] }
instant = { version = "0.1.12", features = ["wasm-bindgen"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tempfile = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
sled = "~0.34"
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }
rocksdb = { version = "0.21.0", optional = true }

Expand Down
6 changes: 2 additions & 4 deletions rust/noosphere-storage/examples/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,15 @@ impl BenchmarkStorage {
))]
let (storage, storage_name) = {
(
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
storage_path.into(),
))?,
noosphere_storage::SledStorage::new(&storage_path)?,
"SledDbStorage",
)
};

#[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))]
let (storage, storage_name) = {
(
noosphere_storage::RocksDbStorage::new(storage_path.into())?,
noosphere_storage::RocksDbStorage::new(&storage_path)?,
"RocksDbStorage",
)
};
Expand Down
157 changes: 157 additions & 0 deletions rust/noosphere-storage/src/backup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use crate::storage::Storage;
use anyhow::Result;
use async_trait::async_trait;
use noosphere_common::ConditionalSend;
use std::path::{Path, PathBuf};

#[cfg(not(target_arch = "wasm32"))]
use crate::FsBackedStorage;

#[cfg(not(target_arch = "wasm32"))]
fn create_backup_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
use instant::SystemTime;
use rand::Rng;

let mut path = path.as_ref().to_owned();
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.map_err(|_| anyhow::anyhow!("Could not generate timestamp."))?
.as_secs();
let nonce = rand::thread_rng().gen::<u32>();
path.set_extension(format!("backup.{}-{}", timestamp, nonce));
Ok(path)
}

/// [Storage] that can be backed up and restored.
/// [FsBackedStorage] types get a blanket implementation.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BackupStorage: Storage {
/// Backup [Storage] located at `path`, moving to a backup location.
async fn backup<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<PathBuf>;
/// Backup [Storage] at `restore_to`, moving [Storage] from `backup_path` to `restore_to`.
async fn restore<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
backup_path: P,
restore_to: Q,
) -> Result<PathBuf>;
/// List paths to backups for `path`.
async fn list_backups<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Vec<PathBuf>>;
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<T> BackupStorage for T
where
T: FsBackedStorage,
{
async fn backup<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<PathBuf> {
let backup_path = create_backup_path(path.as_ref())?;
T::rename(path, &backup_path).await?;
Ok(backup_path)
}

async fn restore<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
backup_path: P,
restore_to: Q,
) -> Result<PathBuf> {
let restoration_path = restore_to.as_ref().to_owned();
let original_backup = T::backup(&restoration_path).await?;
T::rename(backup_path, &restoration_path).await?;
Ok(original_backup)
}

async fn list_backups<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Vec<PathBuf>> {
let mut backups = vec![];
let matcher = format!(
"{}.backup.",
path.as_ref()
.file_name()
.ok_or_else(|| anyhow::anyhow!("Could not stringify path."))?
.to_str()
.ok_or_else(|| anyhow::anyhow!("Could not stringify path."))?
);
let parent_dir = path
.as_ref()
.parent()
.ok_or_else(|| anyhow::anyhow!("Could not find storage parent directory."))?;
let mut stream = tokio::fs::read_dir(parent_dir).await?;
while let Ok(Some(entry)) = stream.next_entry().await {
if let Ok(file_name) = entry.file_name().into_string() {
if file_name.starts_with(&matcher) {
backups.push(entry.path());
}
}
}
Ok(backups)
}
}

#[cfg(all(not(target_arch = "wasm32"), test))]
mod test {
use crate::{OpenStorage, PreferredPlatformStorage, Store};

use super::*;

#[tokio::test]
pub async fn it_can_backup_storages() -> Result<()> {
noosphere_core_dev::tracing::initialize_tracing(None);

let temp_dir = tempfile::TempDir::new()?;
let db_source = temp_dir.path().join("db");

{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let mut store = storage.get_key_value_store("links").await?;
store.write(b"1", b"1").await?;
}

let backup_1 = PreferredPlatformStorage::backup(&db_source).await?;

{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let mut store = storage.get_key_value_store("links").await?;
assert!(store.read(b"1").await?.is_none(), "Backup is a move");
store.write(b"2", b"2").await?;
}

let backup_2 = PreferredPlatformStorage::backup(&db_source).await?;

{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let mut store = storage.get_key_value_store("links").await?;
assert!(store.read(b"1").await?.is_none(), "Backup is a move");
assert!(store.read(b"2").await?.is_none(), "Backup is a move");
store.write(b"3", b"3").await?;
}

let backups = PreferredPlatformStorage::list_backups(&db_source).await?;
assert_eq!(backups.len(), 2);
assert!(backups.contains(&backup_1));
assert!(backups.contains(&backup_2));

let backup_3 = PreferredPlatformStorage::restore(&backup_1, &db_source).await?;
{
let storage = PreferredPlatformStorage::open(&db_source).await?;
let store = storage.get_key_value_store("links").await?;
assert_eq!(store.read(b"1").await?.unwrap(), b"1");
assert!(store.read(b"2").await?.is_none(), "Backup is a move");
assert!(store.read(b"3").await?.is_none(), "Backup is a move");
}

let backups = PreferredPlatformStorage::list_backups(db_source).await?;
assert_eq!(backups.len(), 2);
assert!(
backups.contains(&backup_3),
"contains backup from restoration."
);
assert!(
!backups.contains(&backup_1),
"moves backup that was restored."
);
assert!(
backups.contains(&backup_2),
"contains backups that were untouched."
);
Ok(())
}
}
6 changes: 6 additions & 0 deletions rust/noosphere-storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ use crate::{BlockStore, KeyValueStore, MemoryStore, Storage};

use async_stream::try_stream;

/// Key for the block store in a [SphereDb]'s [Storage].
pub const BLOCK_STORE: &str = "blocks";
/// Key for the link store in a [SphereDb]'s [Storage].
pub const LINK_STORE: &str = "links";
/// Key for the version store in a [SphereDb]'s [Storage].
pub const VERSION_STORE: &str = "versions";
/// Key for the metadata store in a [SphereDb]'s [Storage].
pub const METADATA_STORE: &str = "metadata";

/// All store keys used by [SphereDb].
pub const SPHERE_DB_STORE_NAMES: &[&str] =
&[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE];

Expand All @@ -46,6 +51,7 @@ impl<S> SphereDb<S>
where
S: Storage,
{
/// Creates a new [SphereDb] using underlying `storage`.
pub async fn new(storage: &S) -> Result<SphereDb<S>> {
Ok(SphereDb {
block_store: storage.get_block_store(BLOCK_STORE).await?,
Expand Down
68 changes: 68 additions & 0 deletions rust/noosphere-storage/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::storage::Storage;
use anyhow::Result;
use async_trait::async_trait;
use noosphere_common::ConditionalSend;
use std::path::Path;

/// [Storage] that is based on a file system. Implementing [FsBackedStorage]
/// provides blanket implementations for other trait-based [Storage] operations.
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
pub trait FsBackedStorage: Storage + Sized {
/// Deletes the storage located at `path` directory. Returns `Ok(())` if
/// the directory is successfully removed, or if it already does not exist.
async fn delete<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
match std::fs::metadata(path.as_ref()) {
Ok(_) => std::fs::remove_dir_all(path.as_ref()).map_err(|e| e.into()),
Err(_) => Ok(()),
}
}

/// Moves the storage located at `from` to the `to` location.
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()> {
std::fs::rename(from, to).map_err(|e| e.into())
}
}

/// [Storage] that is based on a file system.
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
pub trait FsBackedStorage: Storage + Sized {
/// Deletes the storage located at `path` directory. Returns `Ok(())` if
/// the directory is successfully removed, or if it already does not exist.
async fn delete<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()>;

/// Moves the storage located at `from` to the `to` location.
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()>;
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<T> crate::ops::DeleteStorage for T
where
T: FsBackedStorage,
{
async fn delete<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
<T as FsBackedStorage>::delete(path).await
}
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<T> crate::ops::RenameStorage for T
where
T: FsBackedStorage,
{
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()> {
<T as FsBackedStorage>::rename(from, to).await
}
}
34 changes: 0 additions & 34 deletions rust/noosphere-storage/src/helpers.rs

This file was deleted.

Loading

0 comments on commit c3d5144

Please sign in to comment.