From 20f0fd705f82cc313ee2a7b90773a237d0317581 Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Thu, 21 Sep 2023 15:52:56 -0700 Subject: [PATCH] WIP: Introduce EphemeralStorage --- .vscode/settings.json | 3 +- Cargo.lock | 3 + rust/noosphere-common/Cargo.toml | 1 + rust/noosphere-common/src/helpers/wait.rs | 17 +- rust/noosphere-common/src/task.rs | 14 + rust/noosphere-core/src/authority/author.rs | 8 +- rust/noosphere-core/src/context/context.rs | 2 +- rust/noosphere-core/src/data/address.rs | 8 +- rust/noosphere-core/src/data/sphere.rs | 4 +- rust/noosphere-core/src/helpers/context.rs | 2 +- rust/noosphere-core/src/helpers/link.rs | 2 +- .../examples/notes-to-html/implementation.rs | 2 +- rust/noosphere-ipfs/src/storage.rs | 17 +- rust/noosphere-ns/src/builder.rs | 4 +- rust/noosphere-ns/src/dht_client.rs | 2 +- rust/noosphere-ns/src/name_system.rs | 6 +- rust/noosphere-ns/src/server/client.rs | 4 +- rust/noosphere-ns/tests/ns_test.rs | 4 +- rust/noosphere-storage/Cargo.toml | 13 +- rust/noosphere-storage/examples/bench/main.rs | 8 +- .../examples/bench/performance.rs | 15 +- rust/noosphere-storage/sleddb/conf | 4 + rust/noosphere-storage/sleddb/db | Bin 0 -> 524287 bytes .../sleddb/snap.00000000000019B0 | Bin 0 -> 123 bytes rust/noosphere-storage/src/db.rs | 83 ++++- rust/noosphere-storage/src/ephemeral.rs | 251 +++++++++++++++ rust/noosphere-storage/src/helpers.rs | 34 --- .../src/implementation/indexed_db.rs | 289 ++++++++++++++++-- .../src/implementation/memory.rs | 19 +- .../src/implementation/rocks_db.rs | 129 +++++++- .../src/implementation/sled.rs | 145 +++++++-- .../src/implementation/tracking.rs | 13 + rust/noosphere-storage/src/lib.rs | 38 ++- rust/noosphere-storage/src/non_persistent.rs | 88 ++++++ rust/noosphere-storage/src/ops.rs | 13 + rust/noosphere-storage/src/partitioned.rs | 82 +++++ rust/noosphere-storage/src/storage.rs | 5 +- rust/noosphere-storage/src/store.rs | 52 +++- rust/noosphere-storage/src/tap.rs | 2 +- rust/noosphere-storage/src/ucan.rs | 2 +- rust/noosphere/src/sphere/builder/mod.rs | 2 +- rust/noosphere/src/storage.rs | 6 +- 42 files changed, 1224 insertions(+), 172 deletions(-) create mode 100644 rust/noosphere-storage/sleddb/conf create mode 100644 rust/noosphere-storage/sleddb/db create mode 100644 rust/noosphere-storage/sleddb/snap.00000000000019B0 create mode 100644 rust/noosphere-storage/src/ephemeral.rs delete mode 100644 rust/noosphere-storage/src/helpers.rs create mode 100644 rust/noosphere-storage/src/non_persistent.rs create mode 100644 rust/noosphere-storage/src/ops.rs create mode 100644 rust/noosphere-storage/src/partitioned.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index b41d47713..62025d3e2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,7 +12,6 @@ }, "rust-analyzer.cargo.features": [ "test-kubo", - "helpers", - "performance" + "helpers" ] } \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index ab662a1f8..26b6164d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3558,6 +3558,7 @@ dependencies = [ "anyhow", "futures", "futures-util", + "gloo-timers", "rand", "tokio", "tracing", @@ -3759,12 +3760,14 @@ dependencies = [ "async-trait", "base64 0.21.2", "cid", + "futures", "instant", "js-sys", "libipld-cbor", "libipld-core", "noosphere-common", "noosphere-core", + "num-traits", "rand", "rexie", "rocksdb", diff --git a/rust/noosphere-common/Cargo.toml b/rust/noosphere-common/Cargo.toml index aead7452d..9fa6798bc 100644 --- a/rust/noosphere-common/Cargo.toml +++ b/rust/noosphere-common/Cargo.toml @@ -32,3 +32,4 @@ tokio = { workspace = true, features = ["sync", "macros"] } futures = { workspace = true } wasm-bindgen = { workspace = true } wasm-bindgen-futures = { workspace = true } +gloo-timers = { workspace = true, features = ["futures"] } diff --git a/rust/noosphere-common/src/helpers/wait.rs b/rust/noosphere-common/src/helpers/wait.rs index 2604fa849..86a055ee8 100644 --- a/rust/noosphere-common/src/helpers/wait.rs +++ b/rust/noosphere-common/src/helpers/wait.rs @@ -1,8 +1,13 @@ -use std::time::Duration; - -/// Wait for the specified number of seconds; uses [tokio::time::sleep], so this -/// will yield to the async runtime rather than block until the sleep time is -/// elapsed. +/// Wait for the specified number of seconds, yielding to the async runtime +/// rather than block. Uses [tokio::time::sleep], or [gloo_timers::future::TimeoutFuture] +/// on `wasm32` targets. pub async fn wait(seconds: u64) { - tokio::time::sleep(Duration::from_secs(seconds)).await; + #[cfg(target_arch = "wasm32")] + { + gloo_timers::future::TimeoutFuture::new((seconds / 1000) as u32).await + } + #[cfg(not(target_arch = "wasm32"))] + { + tokio::time::sleep(std::time::Duration::from_secs(seconds)).await + } } diff --git a/rust/noosphere-common/src/task.rs b/rust/noosphere-common/src/task.rs index 6fc481f4e..55415fa1f 100644 --- a/rust/noosphere-common/src/task.rs +++ b/rust/noosphere-common/src/task.rs @@ -13,6 +13,8 @@ use futures::future::join_all; #[cfg(not(target_arch = "wasm32"))] use tokio::task::JoinSet; +use crate::ConditionalSend; + #[cfg(target_arch = "wasm32")] /// Spawn a future by scheduling it with the local executor. The returned /// future will be pending until the spawned future completes. @@ -43,6 +45,18 @@ where Ok(tokio::spawn(future).await?) } +/// Spawn a future by scheduling it with the local executor. The +/// future will immediately start processing. +pub fn spawn_immediate(future: F) +where + F: Future + ConditionalSend + 'static, +{ + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(future); + #[cfg(not(target_arch = "wasm32"))] + let _ = tokio::runtime::Handle::current().spawn(future); +} + /// An aggregator of async work that can be used to observe the moment when all /// the aggregated work is completed. It is similar to tokio's [JoinSet], but is /// relatively constrained and also works on `wasm32-unknown-unknown`. Unlike diff --git a/rust/noosphere-core/src/authority/author.rs b/rust/noosphere-core/src/authority/author.rs index c307caa3c..a7ab4b06c 100644 --- a/rust/noosphere-core/src/authority/author.rs +++ b/rust/noosphere-core/src/authority/author.rs @@ -179,7 +179,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] async fn it_gives_read_only_access_when_there_is_no_authorization() -> Result<()> { let author = Author::anonymous(); - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let (sphere, _, _) = Sphere::generate("did:key:foo", &mut db).await?; @@ -201,7 +201,7 @@ mod tests { async fn it_gives_read_write_access_if_the_key_is_authorized() -> Result<()> { let owner_key = generate_ed25519_key(); let owner_did = Did(owner_key.get_did().await.unwrap()); - let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap(); let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut db).await.unwrap(); @@ -227,7 +227,7 @@ mod tests { async fn it_gives_read_write_access_to_the_root_sphere_credential() -> Result<()> { let owner_key = generate_ed25519_key(); let owner_did = Did(owner_key.get_did().await.unwrap()); - let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap(); let (sphere, authorization, mnemonic) = Sphere::generate(&owner_did, &mut db).await.unwrap(); @@ -256,7 +256,7 @@ mod tests { async fn it_can_find_itself_in_an_authorization_lineage() -> Result<()> { let owner_key = generate_ed25519_key(); let owner_did = Did(owner_key.get_did().await.unwrap()); - let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap(); let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut db).await.unwrap(); diff --git a/rust/noosphere-core/src/context/context.rs b/rust/noosphere-core/src/context/context.rs index c8f62e1f6..54c5300db 100644 --- a/rust/noosphere-core/src/context/context.rs +++ b/rust/noosphere-core/src/context/context.rs @@ -393,7 +393,7 @@ mod tests { let mut records: Vec = vec![]; let owner_key = generate_ed25519_key(); let owner_did = owner_key.get_did().await?; - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?; let ucan_proof = proof.as_ucan(&db).await?; let sphere_identity = sphere.get_identity().await?; diff --git a/rust/noosphere-core/src/data/address.rs b/rust/noosphere-core/src/data/address.rs index e833886bd..e8fd27310 100644 --- a/rust/noosphere-core/src/data/address.rs +++ b/rust/noosphere-core/src/data/address.rs @@ -403,7 +403,7 @@ mod tests { let sphere_identity = Did::from(sphere_key.get_did().await?); let link = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i"; let cid_link: Link = link.parse()?; - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let record = from_issuer(&sphere_key, &sphere_identity, &cid_link, None).await?; @@ -422,7 +422,7 @@ mod tests { let sphere_identity = Did::from(sphere_key.get_did().await?); let link = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i"; let cid_link: Cid = link.parse()?; - let mut store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut store = SphereDb::new(MemoryStorage::default()).await.unwrap(); // First verify that `owner` cannot publish for `sphere` // without delegation. @@ -486,7 +486,7 @@ mod tests { let sphere_key = generate_ed25519_key(); let sphere_identity = Did::from(sphere_key.get_did().await?); let cid_address = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i"; - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); expect_failure( "fails when expect `fact` is missing", @@ -676,7 +676,7 @@ mod tests { let delegatee_key = generate_ed25519_key(); let delegatee_did = delegatee_key.get_did().await?; - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let mut ucan_store = UcanStore(db.clone()); let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?; diff --git a/rust/noosphere-core/src/data/sphere.rs b/rust/noosphere-core/src/data/sphere.rs index df89c0f6d..4073bebcc 100644 --- a/rust/noosphere-core/src/data/sphere.rs +++ b/rust/noosphere-core/src/data/sphere.rs @@ -78,7 +78,7 @@ mod tests { let identity_credential = generate_ed25519_key(); let identity = Did(identity_credential.get_did().await?); - let mut store = SphereDb::new(&MemoryStorage::default()).await?; + let mut store = SphereDb::new(MemoryStorage::default()).await?; let sphere = SphereIpld::new(&identity, &mut store).await?; @@ -127,7 +127,7 @@ mod tests { let identity = Did(identity_credential.get_did().await?); let authorized = Did(authorized_credential.get_did().await?); - let mut store = SphereDb::new(&MemoryStorage::default()).await?; + let mut store = SphereDb::new(MemoryStorage::default()).await?; let sphere = SphereIpld::new(&identity, &mut store).await?; diff --git a/rust/noosphere-core/src/helpers/context.rs b/rust/noosphere-core/src/helpers/context.rs index 6d2bd8734..5c133c061 100644 --- a/rust/noosphere-core/src/helpers/context.rs +++ b/rust/noosphere-core/src/helpers/context.rs @@ -37,7 +37,7 @@ pub async fn simulated_sphere_context( Some(db) => db, None => { let storage_provider = TrackingStorage::wrap(MemoryStorage::default()); - SphereDb::new(&storage_provider).await? + SphereDb::new(storage_provider).await? } }; diff --git a/rust/noosphere-core/src/helpers/link.rs b/rust/noosphere-core/src/helpers/link.rs index f8290f4e4..f381d3eee 100644 --- a/rust/noosphere-core/src/helpers/link.rs +++ b/rust/noosphere-core/src/helpers/link.rs @@ -16,7 +16,7 @@ where { let owner_key = generate_ed25519_key(); let owner_did = owner_key.get_did().await?; - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?; let ucan_proof = proof.as_ucan(&db).await?; diff --git a/rust/noosphere-into/examples/notes-to-html/implementation.rs b/rust/noosphere-into/examples/notes-to-html/implementation.rs index 0f323d073..55e4cbbba 100644 --- a/rust/noosphere-into/examples/notes-to-html/implementation.rs +++ b/rust/noosphere-into/examples/notes-to-html/implementation.rs @@ -27,7 +27,7 @@ use ucan::crypto::KeyMaterial; pub async fn main() -> Result<()> { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let owner_key: SphereContextKey = Arc::new(Box::new(generate_ed25519_key())); let owner_did = owner_key.get_did().await?; diff --git a/rust/noosphere-ipfs/src/storage.rs b/rust/noosphere-ipfs/src/storage.rs index f79e9dc1f..19c7d45d3 100644 --- a/rust/noosphere-ipfs/src/storage.rs +++ b/rust/noosphere-ipfs/src/storage.rs @@ -3,7 +3,7 @@ use anyhow::Result; use async_trait::async_trait; use cid::Cid; use noosphere_common::ConditionalSync; -use noosphere_storage::{BlockStore, Storage}; +use noosphere_storage::{BlockStore, EphemeralStorage, EphemeralStore, Storage}; use std::sync::Arc; use tokio::sync::RwLock; @@ -45,7 +45,6 @@ where C: IpfsClient + ConditionalSync, { type BlockStore = IpfsStore; - type KeyValueStore = S::KeyValueStore; async fn get_block_store(&self, name: &str) -> Result { @@ -58,6 +57,20 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl EphemeralStorage for IpfsStorage +where + S: Storage + EphemeralStorage + ConditionalSync, + C: IpfsClient + ConditionalSync, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.local_storage.get_ephemeral_store().await + } +} + /// An implementation of [BlockStore] that wraps some other implementation of /// same. It forwards most behavior to its wrapped implementation, except when /// reading blocks. In that case, if a block cannot be found locally, it will diff --git a/rust/noosphere-ns/src/builder.rs b/rust/noosphere-ns/src/builder.rs index 9c9ab72e2..7288d34a3 100644 --- a/rust/noosphere-ns/src/builder.rs +++ b/rust/noosphere-ns/src/builder.rs @@ -23,7 +23,7 @@ use libp2p::kad::KademliaConfig; /// #[tokio::main(flavor = "multi_thread")] /// async fn main() { /// let key_material = generate_ed25519_key(); -/// let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); +/// let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); /// /// let ns = NameSystemBuilder::default() /// .ucan_store(store) @@ -190,7 +190,7 @@ mod tests { let keypair = key_material.to_dht_keypair()?; PeerId::from(keypair.public()) }; - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let bootstrap_peers: Vec = vec![ "/ip4/127.0.0.50/tcp/33333/p2p/12D3KooWH8WgH9mgbMXrKX4veokUznvEn6Ycwg4qaGNi83nLkoUK" .parse()?, diff --git a/rust/noosphere-ns/src/dht_client.rs b/rust/noosphere-ns/src/dht_client.rs index 9dcd0729a..4aa585671 100644 --- a/rust/noosphere-ns/src/dht_client.rs +++ b/rust/noosphere-ns/src/dht_client.rs @@ -133,7 +133,7 @@ pub mod test { // Now test another node connecting. let (_other_ns, other_peer_id) = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) diff --git a/rust/noosphere-ns/src/name_system.rs b/rust/noosphere-ns/src/name_system.rs index 6e9f8e4f2..e1a0b11bc 100644 --- a/rust/noosphere-ns/src/name_system.rs +++ b/rust/noosphere-ns/src/name_system.rs @@ -172,7 +172,7 @@ mod test { async fn before_name_resolver_tests() -> Result { let ns = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) @@ -205,7 +205,7 @@ mod test { async fn before_each() -> Result<(DataPlaceholder, Arc>)> { let (bootstrap, bootstrap_address) = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) @@ -221,7 +221,7 @@ mod test { let ns = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) diff --git a/rust/noosphere-ns/src/server/client.rs b/rust/noosphere-ns/src/server/client.rs index 4cc703f34..519b76085 100644 --- a/rust/noosphere-ns/src/server/client.rs +++ b/rust/noosphere-ns/src/server/client.rs @@ -146,7 +146,7 @@ mod test { async fn before_each() -> Result<(DataPlaceholder, Arc>)> { let (bootstrap, bootstrap_address) = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) @@ -164,7 +164,7 @@ mod test { let api_port = api_listener.local_addr().unwrap().port(); let api_url = Url::parse(&format!("http://127.0.0.1:{}", api_port))?; let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) diff --git a/rust/noosphere-ns/tests/ns_test.rs b/rust/noosphere-ns/tests/ns_test.rs index c95fdb9ab..b612ae486 100644 --- a/rust/noosphere-ns/tests/ns_test.rs +++ b/rust/noosphere-ns/tests/ns_test.rs @@ -75,7 +75,7 @@ async fn test_name_system_peer_propagation() -> Result<()> { initialize_tracing(None); // Create two NameSystems, where `ns_1` is publishing for `sphere_1` // and `ns_2` is publishing for `sphere_2`. - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let network = NameSystemNetwork::generate(3, Some(db.clone())).await?; let sphere_1_cid_1 = derive_cid::(b"00000000"); let sphere_1_cid_2 = derive_cid::(b"11111111"); @@ -172,7 +172,7 @@ async fn test_name_system_peer_propagation() -> Result<()> { #[tokio::test] async fn test_name_system_validation() -> Result<()> { initialize_tracing(None); - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let network = NameSystemNetwork::generate(2, Some(db.clone())).await?; let ns_1 = network.get(1).unwrap(); diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index d91db9e40..40633e8e3 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -21,6 +21,7 @@ readme = "README.md" anyhow = { workspace = true } async-trait = "~0.1" async-stream = { workspace = true } +futures = { workspace = true } tokio-stream = { workspace = true } cid = { workspace = true } noosphere-common = { version = "0.1.0", path = "../noosphere-common" } @@ -28,33 +29,33 @@ tracing = "~0.1" ucan = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } +rand = { workspace = true } serde = { workspace = true } base64 = "=0.21.2" url = { version = "^2" } [dev-dependencies] -witty-phrase-generator = "~0.2" wasm-bindgen-test = { workspace = true } -rand = { workspace = true } noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" } noosphere-common = { path = "../noosphere-common", features = ["helpers"] } instant = { version = "0.1.12", features = ["wasm-bindgen"] } -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tempfile = { workspace = true } - [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tempfile = { workspace = true } sled = "~0.34" tokio = { workspace = true, features = ["full"] } rocksdb = { version = "0.21.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -tokio = { workspace = true, features = ["sync", "macros"] } +tokio = { workspace = true, features = ["sync", "macros", "rt"] } wasm-bindgen = { workspace = true, features = ["serde-serialize"] } wasm-bindgen-futures = { workspace = true } +witty-phrase-generator = "~0.2" serde-wasm-bindgen = { workspace = true } js-sys = { workspace = true } rexie = { version = "~0.5" } +# REMOVE +num-traits = { version = "0.2.15", default-features = false } [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] version = "~0.3" diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 7bcd796f9..c04f45e0b 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -132,9 +132,7 @@ impl BenchmarkStorage { ))] let (storage, storage_name) = { ( - noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path( - storage_path.into(), - ))?, + noosphere_storage::SledStorage::new(&storage_path)?, "SledDbStorage", ) }; @@ -142,7 +140,7 @@ impl BenchmarkStorage { #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] let (storage, storage_name) = { ( - noosphere_storage::RocksDbStorage::new(storage_path.into())?, + noosphere_storage::RocksDbStorage::new(&storage_path).await?, "RocksDbStorage", ) }; @@ -181,7 +179,7 @@ impl BenchmarkStorage { } pub async fn sphere_db(&self) -> Result> { - SphereDb::new(&self.storage).await + SphereDb::new(self.storage.clone()).await } pub async fn as_stats(&mut self) -> Result { diff --git a/rust/noosphere-storage/examples/bench/performance.rs b/rust/noosphere-storage/examples/bench/performance.rs index 09a5f0948..1e2e52b06 100644 --- a/rust/noosphere-storage/examples/bench/performance.rs +++ b/rust/noosphere-storage/examples/bench/performance.rs @@ -1,7 +1,7 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use instant::{Duration, Instant}; -use noosphere_storage::{Space, Storage, Store}; +use noosphere_storage::{EphemeralStorage, EphemeralStore, Space, Storage, Store}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; @@ -208,3 +208,16 @@ impl Store for PerformanceStore { result } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl EphemeralStorage for PerformanceStorage +where + S: Storage, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.storage.get_ephemeral_store().await + } +} diff --git a/rust/noosphere-storage/sleddb/conf b/rust/noosphere-storage/sleddb/conf new file mode 100644 index 000000000..4154d7c45 --- /dev/null +++ b/rust/noosphere-storage/sleddb/conf @@ -0,0 +1,4 @@ +segment_size: 524288 +use_compression: false +version: 0.34 +vQÁ \ No newline at end of file diff --git a/rust/noosphere-storage/sleddb/db b/rust/noosphere-storage/sleddb/db new file mode 100644 index 0000000000000000000000000000000000000000..31aa8ab6754023e5d9d814ee86df3c7857e3a82a GIT binary patch literal 524287 zcmeI%O=uit7zgl4vLDe(YU@EoRN9NZ6xt@+P3lL)BJ@QNVK2?DiVkaRZ(dXRHO)g^x!8Nr*1~+tn*HG84H2m9I{!G+5bMj zdEa-RcV~Y3-R1Mm<3Q9r9veRJ!S9v$_lI6NIZ^J4x}tAaJ$b0NH|k#ahv>)E<9mBA zje45TySk$8*(au-uB?cnujkC<7hB>}OUbFDV|QEwkkwzw%u4D> z>dUXIad5a1$QmfJu9vSpd@7JtTYO#lhJAbM+X7h|i_ALu)~*)=S@j~b_V?XHRD8Y(jDil_EW2C~*IJ}ZCBe|*`wS|DqEkz@X#|WH-CK~YoN%w-rjoe>w&CVk##N2bt8YwU$^~_l>x)SA`Pt@>AtD`9>BVhHq7g7 ze%W-(XBYH`<;w?V&IZPOExSKVu2+(o?nR$n@j~f>{;;y=rk~dYvNmSVucTMY>JO)S zf7lvl{o7+#A3ZsHCXiLn?y-}4lKS%N>U`a3zpiDkz4KL|uA#-(mERxgH`<+Ft$QN< zM%pl$S6Oqpb+1=kzmeYq%zl6L%fS4qXRlH1*JYoQ9vjKpo2-lZ*ULU7ZP@-ev`Why8pW7`DL$$`i;)_ zU#<57=5E-2XCP}ZJ1hA-POe+JuGYTy&1VjM8ps+dvXA=z_9?B`tMmIQ$-0pk=I<*z zzn_v^udE)sb-(<; SphereDb where S: Storage, { - pub async fn new(storage: &S) -> Result> { + pub async fn new(storage: S) -> Result> { Ok(SphereDb { block_store: storage.get_block_store(BLOCK_STORE).await?, link_store: storage.get_key_value_store(LINK_STORE).await?, version_store: storage.get_key_value_store(VERSION_STORE).await?, metadata_store: storage.get_key_value_store(METADATA_STORE).await?, + storage, }) } @@ -277,6 +285,19 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl EphemeralStorage for SphereDb +where + S: Storage, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.storage.get_ephemeral_store().await + } +} + #[cfg(test)] mod tests { @@ -297,7 +318,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] pub async fn it_stores_links_when_a_block_is_saved() { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let list1 = vec!["cats", "dogs", "pigeons"]; let list2 = vec!["apples", "oranges", "starfruit"]; @@ -318,7 +339,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] pub async fn it_can_stream_all_blocks_in_a_dag() { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let list1 = vec!["cats", "dogs", "pigeons"]; let list2 = vec!["apples", "oranges", "starfruit"]; @@ -353,7 +374,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] pub async fn it_can_put_a_raw_block_and_read_it_as_a_token() { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let (cid, block) = block_encode::(&Ipld::Bytes(b"foobar".to_vec())).unwrap(); @@ -363,4 +384,52 @@ mod tests { assert_eq!(token, Some("foobar".into())); } + + /* + async fn test_storage_scratch_provider(db: SphereDb) -> anyhow::Result<()> + where + S: Storage + EphemeralStorage, + S::EphemeralStore: ConditionalSend + 'static, + { + let mut stores = vec![]; + for n in 1..3 { + stores.push((db.get_scratch_store().await?, n)); + } + + for record in stores.iter_mut() { + record + .0 + .set_key(format!("foo-{}", record.1), format!("bar-{}", record.1)) + .await?; + } + + for record in stores { + for i in 1..3 { + let value = record.0.get_key(&format!("foo-{}", i)).await?; + if record.1 == i { + assert_eq!(value, Some(format!("bar-{}", i))); + } else { + assert_eq!(value, None); + } + } + } + Ok(()) + } + + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + pub async fn it_supports_scratch_space_platform_default() -> anyhow::Result<()> { + let storage_provider = make_disposable_storage().await?; + let db = SphereDb::new(storage_provider).await.unwrap(); + test_storage_scratch_provider(db).await + } + + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + pub async fn it_supports_scratch_space_memory() -> anyhow::Result<()> { + let storage_provider = MemoryStorage::default(); + let db = SphereDb::new(storage_provider).await.unwrap(); + test_storage_scratch_provider(db).await + } + */ } diff --git a/rust/noosphere-storage/src/ephemeral.rs b/rust/noosphere-storage/src/ephemeral.rs new file mode 100644 index 000000000..a325ef874 --- /dev/null +++ b/rust/noosphere-storage/src/ephemeral.rs @@ -0,0 +1,251 @@ +use std::sync::Arc; + +use crate::{KeyValueStore, Store}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use futures::Future; +use noosphere_common::ConditionalSync; +use tokio::sync::{Mutex, OnceCell}; + +/// Provides an [EphemeralStore] that does not persist after dropping. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait EphemeralStorage: ConditionalSync { + type EphemeralStoreType: KeyValueStore + Disposable; + + async fn get_ephemeral_store(&self) -> Result>; +} + +/// A [Store] that can clear its data after dropping as an [EphemeralStore]. +/// +/// A [Disposable] store is only cleared when used as an [EphemeralStore]. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Disposable: Store { + async fn dispose(&mut self) -> Result<()>; +} + +/// Wrapper [Store], ensuring underlying data does not persist. +/// +/// As [Store] must be [Sync] and [Clone], [EphemeralStore] clears +/// its data after all references have been dropped. +#[derive(Clone)] +pub struct EphemeralStore(OnceCell>>) +where + S: KeyValueStore + Disposable + 'static; + +impl EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + pub(crate) fn new(store: S) -> Self { + Self(OnceCell::from(Arc::new(Mutex::new(store)))) + } + + async fn store<'a>(&'a self) -> Result> { + Ok(self + .0 + .get() + .ok_or_else(|| anyhow!("Inner store not set."))? + .lock() + .await) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Store for EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + async fn read(&self, key: &[u8]) -> Result>> { + let store = self.store().await?; + store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + let mut store = self.store().await?; + store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + let mut store = self.store().await?; + store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + let store = self.store().await?; + Store::flush(std::ops::Deref::deref(&store)).await + } +} + +impl From for EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + fn from(value: S) -> Self { + EphemeralStore::new(value) + } +} + +//#[cfg(not(target_arch = "wasm32"))] +impl Drop for EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + // # noosphere_common::spawn + // requires an .await to invoke + // + // # tokio::runtime::Handle::current().spawn + // Works on native, needs waits in tests + // + // # tokio::runtime::Handle::current().block_on + // native: "Cannot start a runtime within a runtime" + // wasm32 (in WBT runner): no Tokio reactor running + // + // # futures::executor::block_on + // native: OK + // wasm32: busy loop/locks + fn drop(&mut self) { + if let Some(store_arc) = self.0.take() { + if let Some(store_mutex) = Arc::into_inner(store_arc) { + let mut store = store_mutex.into_inner(); + noosphere_common::spawn_immediate(async move { + if let Err(e) = store.dispose().await { + error!("Error disposing EphemeralStore: {}", e); + } + }); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + EphemeralStorage, IterableStore, NonPersistentStorage, OpenStorage, + PreferredPlatformStorage, Storage, EPHEMERAL_STORE, + }; + use noosphere_core_dev::tracing::initialize_tracing; + use std::path::Path; + use tokio_stream::StreamExt; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_clears_ephemeral_storage_on_store_drop() -> Result<()> { + initialize_tracing(None); + + let storage = NonPersistentStorage::::new().await?; + + for _ in 0..3 { + let mut ephemeral_store = storage.get_ephemeral_store().await?; + for n in 0..10 { + ephemeral_store + .write(format!("{}", n).as_ref(), &vec![2; 1000]) + .await?; + } + } + + // Wait for destructors to complete asynchronously. + noosphere_common::helpers::wait(1).await; + + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + { + // Sled's storage space still grows on a new DB when removing trees. + // Otherwise we could also test deletion via [crate::Space]. + let db = storage.inner(); + assert_eq!(db.tree_names().len(), 1, "only default tree persists."); + } + + { + // Ensure there's no extra data in the ephemeral space + // (IndexedDbStorage, RocksDbStorage) + let store = storage.get_key_value_store(EPHEMERAL_STORE).await?; + let mut stream = store.get_all_entries(); + assert!( + stream.try_next().await?.is_none(), + "ephemeral store should have no entries." + ); + } + Ok(()) + } + + /// Circumvent using [EphemeralStore] to ensure ephemeral data is wiped + /// on storage drop/init in the event [EphemeralStore] cleanups do not occur. + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_clears_ephemeral_storage_on_startup() -> Result<()> { + #[cfg(target_arch = "wasm32")] + let db_path: String = witty_phrase_generator::WPGen::new() + .with_words(3) + .unwrap() + .into_iter() + .map(|word| String::from(word)) + .collect(); + + #[cfg(not(target_arch = "wasm32"))] + let _temp_dir = tempfile::TempDir::new()?; + #[cfg(not(target_arch = "wasm32"))] + let db_path = _temp_dir.path(); + + #[cfg(any(target_arch = "wasm32", feature = "rocksdb"))] + let result = test_partition_based_ephemeral_storage_startup(db_path).await?; + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + let result = test_tree_based_ephemeral_storage_startup(db_path).await?; + + Ok(result) + } + + /// IndexedDbStorage and RocksDbStorage write to the shared ephemeral space. + /// Write directly to the ephemeral store to ensure it is cleaned up + /// on storage drop/init. + #[cfg(any(target_arch = "wasm32", feature = "rocksdb"))] + async fn test_partition_based_ephemeral_storage_startup>( + db_path: P, + ) -> Result<()> { + { + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let mut store = storage.get_key_value_store(EPHEMERAL_STORE).await?; + store.write(&[0], &[11]).await?; + assert_eq!(store.read(&vec![0]).await?, Some(vec![11])); + } + + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let store = storage.get_key_value_store(EPHEMERAL_STORE).await?; + let mut stream = store.get_all_entries(); + assert!( + stream.try_next().await?.is_none(), + "ephemeral store should have no entries." + ); + Ok(()) + } + + // SledStorage uses ephemeral trees. Ensure the trees do not exist after + // reinitialization. + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + async fn test_tree_based_ephemeral_storage_startup>(db_path: P) -> Result<()> { + let mut store_name = String::from(crate::implementation::EPHEMERAL_SLED_PREFIX); + store_name.push_str("-1234567890"); + { + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let mut store = storage.get_key_value_store(&store_name).await?; + store.write(&[1], &[11]).await?; + assert_eq!(store.read(&vec![1]).await?, Some(vec![11])); + } + + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let store = storage.get_key_value_store(&store_name).await?; + let mut stream = store.get_all_entries(); + assert!( + stream.try_next().await?.is_none(), + "ephemeral store should have no entries." + ); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/helpers.rs b/rust/noosphere-storage/src/helpers.rs deleted file mode 100644 index 5c531ef3c..000000000 --- a/rust/noosphere-storage/src/helpers.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::Storage; -use anyhow::Result; - -#[cfg(not(target_arch = "wasm32"))] -use crate::{SledStorage, SledStorageInit, SledStore}; - -#[cfg(not(target_arch = "wasm32"))] -pub async fn make_disposable_store() -> Result { - let temp_dir = std::env::temp_dir(); - let temp_name: String = witty_phrase_generator::WPGen::new() - .with_words(3) - .unwrap() - .into_iter() - .map(String::from) - .collect(); - let provider = SledStorage::new(SledStorageInit::Path(temp_dir.join(temp_name)))?; - provider.get_block_store("foo").await -} - -#[cfg(target_arch = "wasm32")] -use crate::{IndexedDbStorage, IndexedDbStore}; - -#[cfg(target_arch = "wasm32")] -pub async fn make_disposable_store() -> Result { - let temp_name: String = witty_phrase_generator::WPGen::new() - .with_words(3) - .unwrap() - .into_iter() - .map(|word| String::from(word)) - .collect(); - - let provider = IndexedDbStorage::new(&temp_name).await?; - provider.get_block_store(crate::db::BLOCK_STORE).await -} diff --git a/rust/noosphere-storage/src/implementation/indexed_db.rs b/rust/noosphere-storage/src/implementation/indexed_db.rs index f80113738..4c3c8ef2b 100644 --- a/rust/noosphere-storage/src/implementation/indexed_db.rs +++ b/rust/noosphere-storage/src/implementation/indexed_db.rs @@ -1,16 +1,24 @@ use crate::store::Store; -use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage}; -use anyhow::{anyhow, Error, Result}; +use crate::{ + db::{EPHEMERAL_STORE, SPHERE_DB_STORE_NAMES}, + storage::Storage, + PartitionedStore, +}; +use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; use js_sys::Uint8Array; +use noosphere_common::ConditionalSend; use rexie::{ KeyRange, ObjectStore, Rexie, RexieBuilder, Store as IdbStore, Transaction, TransactionMode, }; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, rc::Rc}; +use std::{fmt::Debug, path::Path, rc::Rc}; use wasm_bindgen::{JsCast, JsValue}; -pub const INDEXEDDB_STORAGE_VERSION: u32 = 1; +use js_utils::*; + +pub const INDEXEDDB_STORAGE_VERSION: u32 = 2; #[derive(Clone)] pub struct IndexedDbStorage { @@ -26,7 +34,10 @@ impl Debug for IndexedDbStorage { impl IndexedDbStorage { pub async fn new(db_name: &str) -> Result { - Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await + let storage = + Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await?; + storage.clear_ephemeral().await?; + Ok(storage) } async fn configure(version: u32, db_name: &str, store_names: &[&str]) -> Result { @@ -74,6 +85,12 @@ impl IndexedDbStorage { .await .map_err(|error| anyhow!("{:?}", error)) } + + /// Wipes the "ephemeral" column family. + async fn clear_ephemeral(&self) -> Result<()> { + let ephemeral_store = self.get_store(EPHEMERAL_STORE).await?; + ephemeral_store.clear().await + } } #[async_trait(?Send)] @@ -91,6 +108,31 @@ impl Storage for IndexedDbStorage { } } +#[async_trait(?Send)] +impl crate::ops::OpenStorage for IndexedDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + IndexedDbStorage::new( + path.as_ref() + .to_str() + .ok_or_else(|| anyhow!("Could not stringify path."))?, + ) + .await + } +} + +#[async_trait(?Send)] +impl crate::EphemeralStorage for IndexedDbStorage { + type EphemeralStoreType = EphemeralIndexedDbStore; + + async fn get_ephemeral_store(&self) -> Result> { + let store = PartitionedStore::new(IndexedDbStore { + db: self.db.clone(), + store_name: EPHEMERAL_STORE.to_owned(), + }); + Ok(crate::EphemeralIndexedDbStore::new(store).into()) + } +} + #[derive(Clone)] pub struct IndexedDbStore { db: Rc, @@ -98,7 +140,10 @@ pub struct IndexedDbStore { } impl IndexedDbStore { - fn start_transaction(&self, mode: TransactionMode) -> Result<(IdbStore, Transaction)> { + pub(crate) fn start_transaction( + &self, + mode: TransactionMode, + ) -> Result<(IdbStore, Transaction)> { let tx = self .db .transaction(&[&self.store_name], mode) @@ -110,15 +155,36 @@ impl IndexedDbStore { Ok((store, tx)) } - async fn finish_transaction(tx: Transaction) -> Result<()> { + pub(crate) async fn finish_transaction(tx: Transaction) -> Result<()> { tx.done().await.map_err(|error| anyhow!("{:?}", error))?; Ok(()) } - fn bytes_to_typed_array(bytes: &[u8]) -> Result { - let array = Uint8Array::new_with_length(bytes.len() as u32); - array.copy_from(&bytes); - Ok(JsValue::from(array)) + async fn clear(&self) -> Result<()> { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + store + .clear() + .await + .map_err(|error| anyhow!("{:?}", error))?; + IndexedDbStore::finish_transaction(tx).await?; + Ok(()) + } + + async fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<()> { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + + let lower = bytes_to_typed_array(from)?; + let upper = bytes_to_typed_array(to)?; + let key_range = + KeyRange::bound(&lower, &upper, false, false).map_err(|e| anyhow!("{:?}", e))?; + + store + .delete(key_range.as_ref()) + .await + .map_err(|error| anyhow!("{:?}", error))?; + + IndexedDbStore::finish_transaction(tx).await?; + Ok(()) } async fn contains(key: &JsValue, store: &IdbStore) -> Result { @@ -151,7 +217,7 @@ impl IndexedDbStore { impl Store for IndexedDbStore { async fn read(&self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadOnly)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; + let key = bytes_to_typed_array(key)?; let maybe_dag = IndexedDbStore::read(&key, &store).await?; @@ -163,8 +229,8 @@ impl Store for IndexedDbStore { async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; - let value = IndexedDbStore::bytes_to_typed_array(bytes)?; + let key = bytes_to_typed_array(key)?; + let value = bytes_to_typed_array(bytes)?; let old_bytes = IndexedDbStore::read(&key, &store).await?; @@ -181,7 +247,7 @@ impl Store for IndexedDbStore { async fn remove(&mut self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; + let key = bytes_to_typed_array(key)?; let old_value = IndexedDbStore::read(&key, &store).await?; @@ -196,28 +262,83 @@ impl Store for IndexedDbStore { } } -struct JsError(Error); +impl crate::IterableStore for IndexedDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + let limit = 100; + let mut offset = 0; + loop { + let results = store.get_all(None, Some(limit), Some(offset), None).await + .map_err(|error| anyhow!("{:?}", error))?; + let count = results.len(); + if count == 0 { + IndexedDbStore::finish_transaction(tx).await?; + break; + } + + offset += count as u32; + + for (key_js, value_js) in results { + yield ( + typed_array_to_bytes(JsValue::from(Uint8Array::new(&key_js)))?, + typed_array_to_bytes(value_js)? + ); + } + } + }) + } +} + +/// A [IndexedDbStore] that does not persist data after dropping. +/// Can be created from [IndexedDbStorage::get_ephemeral_store]. +#[derive(Clone)] +pub struct EphemeralIndexedDbStore { + store: PartitionedStore, +} -impl From for JsError { - fn from(value: JsValue) -> JsError { - if let Ok(js_string) = js_sys::JSON::stringify(&value) { - JsError(anyhow!("{}", js_string.as_string().unwrap())) - } else { - JsError(anyhow!("Could not parse JsValue error as string.")) - } +impl EphemeralIndexedDbStore { + pub(crate) fn new(store: PartitionedStore) -> Self { + EphemeralIndexedDbStore { store } } } -impl From for JsError { - fn from(value: serde_wasm_bindgen::Error) -> JsError { - let js_value: JsValue = value.into(); - js_value.into() +#[async_trait(?Send)] +impl Store for EphemeralIndexedDbStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +#[async_trait(?Send)] +impl crate::Disposable for EphemeralIndexedDbStore { + async fn dispose(&mut self) -> Result<()> { + let (start_key, end_key) = self.store.get_key_range(); + let mut new_end_key = start_key.to_owned(); + new_end_key.extend(format!("\u{FFFF}").as_bytes()); + self.store + .inner() + .remove_range(start_key, &new_end_key) + .await } } -impl From for Error { - fn from(value: JsError) -> Self { - value.0 +#[async_trait(?Send)] +impl crate::IterableStore for EphemeralIndexedDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + self.store.inner().get_all_entries() } } @@ -263,3 +384,111 @@ impl crate::Space for IndexedDbStorage { } } } + +mod js_utils { + use anyhow::{anyhow, Error, Result}; + use js_sys::Uint8Array; + use wasm_bindgen::{JsCast, JsValue}; + + pub struct JsError(Error); + + impl From for JsError { + fn from(value: JsValue) -> JsError { + if let Ok(js_string) = js_sys::JSON::stringify(&value) { + JsError(anyhow!("{}", js_string.as_string().unwrap())) + } else { + JsError(anyhow!("Could not parse JsValue error as string.")) + } + } + } + + impl From for JsError { + fn from(value: serde_wasm_bindgen::Error) -> JsError { + let js_value: JsValue = value.into(); + js_value.into() + } + } + + impl From for Error { + fn from(value: JsError) -> Self { + value.0 + } + } + + pub fn bytes_to_typed_array(bytes: &[u8]) -> Result { + let array = Uint8Array::new_with_length(bytes.len() as u32); + array.copy_from(&bytes); + Ok(JsValue::from(array)) + } + + pub fn typed_array_to_bytes(js_value: JsValue) -> Result> { + Ok(js_value + .dyn_into::() + .map_err(|error| anyhow!("{:?}", error))? + .to_vec()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::key_value::KeyValueStore; + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + wasm_bindgen_test_configure!(run_in_browser); + + #[derive(Clone)] + pub struct IndexedDbStorageV1 { + db: Rc, + } + + impl IndexedDbStorageV1 { + pub async fn new(db_name: &str) -> Result { + const V1_STORES: [&str; 4] = ["blocks", "links", "versions", "metadata"]; + let mut builder = RexieBuilder::new(db_name).version(1); + + for name in V1_STORES { + builder = builder.add_object_store(ObjectStore::new(name).auto_increment(false)); + } + + let db = builder + .build() + .await + .map_err(|error| anyhow!("{:?}", error))?; + + Ok(IndexedDbStorageV1 { db: Rc::new(db) }) + } + + async fn get_store(&self, name: &str) -> Result { + if self + .db + .store_names() + .iter() + .find(|val| val.as_str() == name) + .is_none() + { + return Err(anyhow!("No such store named {}", name)); + } + + Ok(IndexedDbStore { + db: self.db.clone(), + store_name: name.to_string(), + }) + } + } + + #[wasm_bindgen_test] + async fn it_can_upgrade_from_v1() -> Result<()> { + let key = String::from("foo"); + let value = String::from("bar"); + { + let storage_v1 = IndexedDbStorageV1::new("v1_test").await?; + let mut store_v1 = storage_v1.get_store("links").await?; + store_v1.set_key(&key, &value).await?; + } + + let storage_v2 = IndexedDbStorage::new("v1_test").await?; + let store_v2 = storage_v2.get_store("links").await?; + assert_eq!(store_v2.get_key::<_, String>(&key).await?.unwrap(), value); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/implementation/memory.rs b/rust/noosphere-storage/src/implementation/memory.rs index 8fd92e692..0361a33c2 100644 --- a/rust/noosphere-storage/src/implementation/memory.rs +++ b/rust/noosphere-storage/src/implementation/memory.rs @@ -131,7 +131,24 @@ impl Store for MemoryStore { } } -#[cfg(feature = "performance")] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::EphemeralStorage for MemoryStorage { + type EphemeralStoreType = MemoryStore; + + async fn get_ephemeral_store(&self) -> Result> { + Ok(MemoryStore::default().into()) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::Disposable for MemoryStore { + async fn dispose(&mut self) -> Result<()> { + Ok(()) + } +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl crate::Space for MemoryStorage { diff --git a/rust/noosphere-storage/src/implementation/rocks_db.rs b/rust/noosphere-storage/src/implementation/rocks_db.rs index cd2dd4bf0..d9840bde3 100644 --- a/rust/noosphere-storage/src/implementation/rocks_db.rs +++ b/rust/noosphere-storage/src/implementation/rocks_db.rs @@ -1,7 +1,11 @@ -use crate::{storage::Storage, store::Store, SPHERE_DB_STORE_NAMES}; +use crate::{ + storage::Storage, store::Store, PartitionedStore, EPHEMERAL_STORE, SPHERE_DB_STORE_NAMES, +}; use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; -use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, Options}; +use noosphere_common::ConditionalSend; +use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, Options}; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -29,14 +33,16 @@ pub struct RocksDbStorage { } impl RocksDbStorage { - pub fn new(path: PathBuf) -> Result { - std::fs::create_dir_all(&path)?; - let canonicalized = path.canonicalize()?; + pub async fn new + ConditionalSend>(path: P) -> Result { + std::fs::create_dir_all(path.as_ref())?; + let canonicalized = path.as_ref().canonicalize()?; let db = Arc::new(RocksDbStorage::init_db(canonicalized.clone())?); - Ok(RocksDbStorage { + let storage = RocksDbStorage { db, path: canonicalized, - }) + }; + storage.clear_ephemeral().await?; + Ok(storage) } async fn get_store(&self, name: &str) -> Result { @@ -67,6 +73,12 @@ impl RocksDbStorage { Ok(DbInner::open_cf_descriptors(&db_opts, path, cfs)?) } + + /// Wipes the "ephemeral" column family. + async fn clear_ephemeral(&self) -> Result<()> { + let ephemeral_store = self.get_store(EPHEMERAL_STORE).await?; + ephemeral_store.remove_range(&[0], &[u8::MAX]).await + } } #[async_trait] @@ -83,6 +95,31 @@ impl Storage for RocksDbStorage { } } +#[async_trait] +impl crate::EphemeralStorage for RocksDbStorage { + type EphemeralStoreType = EphemeralRocksDbStore; + + async fn get_ephemeral_store(&self) -> Result> { + let inner = self.get_store(crate::EPHEMERAL_STORE).await?; + let mapped = crate::PartitionedStore::new(inner); + Ok(EphemeralRocksDbStore::new(mapped).into()) + } +} + +#[async_trait] +impl crate::OpenStorage for RocksDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + RocksDbStorage::new(path).await + } +} + +#[async_trait] +impl crate::Space for RocksDbStorage { + async fn get_space_usage(&self) -> Result { + crate::get_dir_size(&self.path).await + } +} + #[derive(Clone)] pub struct RocksDbStore { name: String, @@ -90,10 +127,17 @@ pub struct RocksDbStore { } impl RocksDbStore { - pub fn new(db: Arc, name: String) -> Result { + pub(crate) fn new(db: Arc, name: String) -> Result { Ok(RocksDbStore { db, name }) } + async fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<()> { + let cf = self.cf_handle()?; + #[cfg(feature = "rocksdb-multi-thread")] + let cf = &cf; + self.db.delete_range_cf(cf, from, to).map_err(|e| e.into()) + } + /// Returns the column family handle. Unfortunately generated on every call /// due to not being `Sync`, potentially `unsafe` alternatives: /// https://github.com/rust-rocksdb/rust-rocksdb/issues/407 @@ -140,8 +184,71 @@ impl Store for RocksDbStore { } #[async_trait] -impl crate::Space for RocksDbStorage { - async fn get_space_usage(&self) -> Result { - crate::get_dir_size(&self.path).await +impl crate::IterableStore for RocksDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + // handle is not Sync; generate the iterator before + // async stream work. + let cf_option = self.db.cf_handle(&self.name); + let iter = if let Some(cf) = cf_option { + #[cfg(feature = "rocksdb-multi-thread")] + let cf = &cf; + Some(self.db.iterator_cf(cf, IteratorMode::Start)) + } else { + None + }; + Box::pin(try_stream! { + let iter = iter.ok_or_else(|| anyhow!("Could not get cf handle."))?; + for entry in iter { + let (key, value) = entry?; + yield (Vec::from(key.as_ref()), Vec::from(value.as_ref())); + } + }) + } +} + +/// A [RocksDbStore] that does not persist data after dropping. +/// Can be created from [IndexedDbStorage::get_ephemeral_store]. +#[derive(Clone)] +pub struct EphemeralRocksDbStore { + store: PartitionedStore, +} + +impl EphemeralRocksDbStore { + pub(crate) fn new(store: PartitionedStore) -> Self { + EphemeralRocksDbStore { store } + } +} + +#[async_trait] +impl Store for EphemeralRocksDbStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +#[async_trait] +impl crate::Disposable for EphemeralRocksDbStore { + async fn dispose(&mut self) -> Result<()> { + let (start_key, end_key) = self.store.get_key_range(); + self.store.inner().remove_range(start_key, end_key).await + } +} + +#[async_trait] +impl crate::IterableStore for EphemeralRocksDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + self.store.inner().get_all_entries() } } diff --git a/rust/noosphere-storage/src/implementation/sled.rs b/rust/noosphere-storage/src/implementation/sled.rs index afab87527..74498151f 100644 --- a/rust/noosphere-storage/src/implementation/sled.rs +++ b/rust/noosphere-storage/src/implementation/sled.rs @@ -1,12 +1,16 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use crate::storage::Storage; use crate::store::Store; -use anyhow::Result; +use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; +use noosphere_common::ConditionalSend; use sled::{Db, Tree}; +pub(crate) const EPHEMERAL_SLED_PREFIX: &str = "EPHEMERAL-SLED-STORAGE"; + pub enum SledStorageInit { Path(PathBuf), Db(Db), @@ -15,28 +19,46 @@ pub enum SledStorageInit { #[derive(Clone, Debug)] pub struct SledStorage { db: Db, - #[allow(unused)] - path: Option, + _path: PathBuf, } impl SledStorage { - pub fn new(init: SledStorageInit) -> Result { - let mut db_path = None; - let db: Db = match init { - SledStorageInit::Path(path) => { - std::fs::create_dir_all(&path)?; - db_path = Some(path.clone().canonicalize()?); - sled::open(path)? - } - SledStorageInit::Db(db) => db, - }; - - Ok(SledStorage { db, path: db_path }) + /// Open or create a database at directory `path`. + pub fn new>(path: P) -> Result { + std::fs::create_dir_all(path.as_ref())?; + let db_path = path.as_ref().canonicalize()?; + let db = sled::open(&db_path)?; + + let storage = SledStorage { db, _path: db_path }; + storage.clear_ephemeral()?; + Ok(storage) } async fn get_store(&self, name: &str) -> Result { Ok(SledStore::new(&self.db.open_tree(name)?)) } + + #[cfg(test)] + pub(crate) fn inner(&self) -> &Db { + &self.db + } + + /// Wipes all "ephemeral" trees. + fn clear_ephemeral(&self) -> Result<()> { + for name in self.db.tree_names() { + let tree_name = String::from_utf8(Vec::from(name.as_ref()))?; + if tree_name.starts_with(EPHEMERAL_SLED_PREFIX) { + match self.db.drop_tree(tree_name.as_bytes())? { + true => continue, + false => { + warn!("Could not drop ephemeral tree {}", tree_name); + continue; + } + } + } + } + Ok(()) + } } #[async_trait] @@ -54,13 +76,42 @@ impl Storage for SledStorage { } } +impl Drop for SledStorage { + fn drop(&mut self) { + let _ = self.db.flush(); + } +} + +#[async_trait] +impl crate::EphemeralStorage for SledStorage { + type EphemeralStoreType = EphemeralSledStore; + + async fn get_ephemeral_store(&self) -> Result> { + Ok(EphemeralSledStore::new(self.db.clone())?.into()) + } +} + +#[async_trait] +impl crate::OpenStorage for SledStorage { + async fn open + ConditionalSend>(path: P) -> Result { + SledStorage::new(path) + } +} + +#[async_trait] +impl crate::Space for SledStorage { + async fn get_space_usage(&self) -> Result { + self.db.size_on_disk().map_err(|e| e.into()) + } +} + #[derive(Clone)] pub struct SledStore { db: Tree, } impl SledStore { - pub fn new(db: &Tree) -> Self { + pub(crate) fn new(db: &Tree) -> Self { SledStore { db: db.clone() } } } @@ -98,15 +149,63 @@ impl Store for SledStore { } } -impl Drop for SledStorage { - fn drop(&mut self) { - let _ = self.db.flush(); +/// A [SledStore] that does not persist data after dropping. +/// Can be created from [SledStorage]'s [crate::EphemeralStorage] implementation. +#[derive(Clone)] +pub struct EphemeralSledStore { + db: Db, + name: String, + store: SledStore, +} + +impl EphemeralSledStore { + pub(crate) fn new(db: Db) -> Result { + let name = format!("{}-{}", EPHEMERAL_SLED_PREFIX, rand::random::()); + let store = SledStore::new(&db.open_tree(&name)?); + Ok(EphemeralSledStore { db, store, name }) } } #[async_trait] -impl crate::Space for SledStorage { - async fn get_space_usage(&self) -> Result { - self.db.size_on_disk().map_err(|e| e.into()) +impl Store for EphemeralSledStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +#[async_trait] +impl crate::IterableStore for SledStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + for entry in self.db.iter() { + let (key, value) = entry?; + yield (Vec::from(key.as_ref()), Vec::from(value.as_ref())); + } + }) + } +} + +#[async_trait] +impl crate::Disposable for EphemeralSledStore { + async fn dispose(&mut self) -> Result<()> { + self.db.drop_tree(&self.name).map_or_else( + |e| Err(e.into()), + |bool_state| match bool_state { + true => Ok(()), + false => Err(anyhow!("Could not clear temporary tree.")), + }, + ) } } diff --git a/rust/noosphere-storage/src/implementation/tracking.rs b/rust/noosphere-storage/src/implementation/tracking.rs index ba690d755..32d24fb9e 100644 --- a/rust/noosphere-storage/src/implementation/tracking.rs +++ b/rust/noosphere-storage/src/implementation/tracking.rs @@ -103,3 +103,16 @@ impl Storage for TrackingStorage { Ok(key_value_store) } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::EphemeralStorage for TrackingStorage +where + S: Storage + crate::EphemeralStorage, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.storage.get_ephemeral_store().await + } +} diff --git a/rust/noosphere-storage/src/lib.rs b/rust/noosphere-storage/src/lib.rs index 17835e0d8..418bf38f3 100644 --- a/rust/noosphere-storage/src/lib.rs +++ b/rust/noosphere-storage/src/lib.rs @@ -12,7 +12,12 @@ mod key_value; mod db; mod encoding; +mod ephemeral; +mod non_persistent; +mod ops; +mod partitioned; mod retry; +mod space; mod storage; mod store; mod tap; @@ -22,22 +27,35 @@ pub use crate::ucan::*; pub use block::*; pub use db::*; pub use encoding::*; +pub use ephemeral::*; pub use implementation::*; pub use key_value::*; +pub use non_persistent::*; +pub use ops::*; +pub use partitioned::*; pub use retry::*; +pub use space::*; pub use storage::*; pub use store::*; pub use tap::*; -mod space; -pub use space::*; - #[cfg(test)] -pub mod helpers; +mod inner { + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + pub type PreferredPlatformStorage = crate::SledStorage; + #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] + pub type PreferredPlatformStorage = crate::RocksDbStorage; + #[cfg(target_arch = "wasm32")] + pub type PreferredPlatformStorage = crate::IndexedDbStorage; +} +#[cfg(test)] +pub use inner::*; #[cfg(test)] mod tests { - use crate::{block::BlockStore, helpers::make_disposable_store}; + use crate::{ + block::BlockStore, NonPersistentStorage, PreferredPlatformStorage, Storage, BLOCK_STORE, + }; use libipld_cbor::DagCborCodec; #[cfg(target_arch = "wasm32")] @@ -47,13 +65,15 @@ mod tests { #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] - async fn it_can_store_and_retrieve_bytes() { - let mut storage = make_disposable_store().await.unwrap(); + async fn it_can_store_and_retrieve_bytes() -> anyhow::Result<()> { + let storage = NonPersistentStorage::::new().await?; + let mut store = storage.get_block_store(BLOCK_STORE).await?; let bytes = b"I love every kind of cat"; - let cid = storage.save::(bytes).await.unwrap(); - let retrieved = storage.load::>(&cid).await.unwrap(); + let cid = store.save::(bytes).await.unwrap(); + let retrieved = store.load::>(&cid).await.unwrap(); assert_eq!(retrieved, bytes); + Ok(()) } } diff --git a/rust/noosphere-storage/src/non_persistent.rs b/rust/noosphere-storage/src/non_persistent.rs new file mode 100644 index 000000000..a55f06842 --- /dev/null +++ b/rust/noosphere-storage/src/non_persistent.rs @@ -0,0 +1,88 @@ +use crate::{ops::OpenStorage, storage::Storage}; +use anyhow::Result; +use std::ops::{Deref, DerefMut}; + +#[cfg(doc)] +use crate::EphemeralStorage; +#[cfg(doc)] +use crate::MemoryStorage; + +/// [Storage] provider wrapper that does not persist after dropping. +/// +/// Whereas [EphemeralStorage] can provide a slice of a storage system +/// as non-persistent storage space, the entirety of [NonPersistentStorage] +/// is wiped after dropping. +/// +/// Currently, native builds create a temp dir syncing lifetimes, and web +/// builds use a randomly generated database name. +/// In the future, we may have web builds that use +/// a file-system backed Storage, or native builds that do not use +/// the file-system (currently the case with [MemoryStorage]), where +/// a more complex configuration is needed. Mostly used in tests. +pub struct NonPersistentStorage +where + S: Storage + OpenStorage, +{ + inner: S, + #[cfg(not(target_arch = "wasm32"))] + _temp_dir: tempfile::TempDir, +} + +impl NonPersistentStorage +where + S: Storage + OpenStorage, +{ + /// Create a new [NonPersistentStorage], wrapping a new [Storage] + /// that will be cleared after dropping. + pub async fn new() -> Result { + #[cfg(target_arch = "wasm32")] + let key: String = witty_phrase_generator::WPGen::new() + .with_words(3) + .unwrap() + .into_iter() + .map(|word| String::from(word)) + .collect(); + #[cfg(target_arch = "wasm32")] + let inner = S::open(&key).await?; + #[cfg(target_arch = "wasm32")] + let out = Self { inner }; + + #[cfg(not(target_arch = "wasm32"))] + let _temp_dir = tempfile::TempDir::new()?; + #[cfg(not(target_arch = "wasm32"))] + let inner = S::open(_temp_dir.path()).await?; + #[cfg(not(target_arch = "wasm32"))] + let out = Self { _temp_dir, inner }; + + Ok(out) + } +} + +impl Deref for NonPersistentStorage +where + S: Storage + OpenStorage, +{ + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for NonPersistentStorage +where + S: Storage + OpenStorage, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsRef for NonPersistentStorage +where + S: Storage + OpenStorage, +{ + fn as_ref(&self) -> &S { + &self.inner + } +} diff --git a/rust/noosphere-storage/src/ops.rs b/rust/noosphere-storage/src/ops.rs new file mode 100644 index 000000000..51e5e52f7 --- /dev/null +++ b/rust/noosphere-storage/src/ops.rs @@ -0,0 +1,13 @@ +use crate::storage::Storage; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use std::path::Path; + +/// [Storage] that can be opened via [Path] reference. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait OpenStorage: Storage + Sized { + /// Open [Storage] at `path`. + async fn open + ConditionalSend>(path: P) -> Result; +} diff --git a/rust/noosphere-storage/src/partitioned.rs b/rust/noosphere-storage/src/partitioned.rs new file mode 100644 index 000000000..003a3103c --- /dev/null +++ b/rust/noosphere-storage/src/partitioned.rs @@ -0,0 +1,82 @@ +use crate::{Disposable, Store}; +use anyhow::Result; +use async_trait::async_trait; + +/// Maps all [Store] method calls with a key to a prefixed form. +#[derive(Clone)] +pub struct PartitionedStore +where + S: Store, +{ + store: S, + partition_key: Vec, + end_partition_key: Vec, +} + +impl PartitionedStore +where + S: Store, +{ + pub fn new(store: S) -> Self { + let prefix: Vec = format!("{:<10}", rand::random::()).into(); + Self::with_partition_key(store, prefix) + } + + pub fn with_partition_key(store: S, partition_key: Vec) -> Self { + let mut end_partition_key = partition_key.clone(); + end_partition_key.push(u8::MAX); + Self { + store, + partition_key, + end_partition_key, + } + } + + pub fn get_key_range(&self) -> (&Vec, &Vec) { + (&self.partition_key, &self.end_partition_key) + } + + pub fn inner(&self) -> &S { + &self.store + } + + fn map_key(&self, key: &[u8]) -> Vec { + let mut new_key = self.partition_key.clone(); + new_key.extend(key); + new_key + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Store for PartitionedStore +where + S: Store, +{ + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(&self.map_key(key)).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(&self.map_key(key), bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(&self.map_key(key)).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Disposable for PartitionedStore +where + S: Store + Disposable, +{ + async fn dispose(&mut self) -> Result<()> { + self.store.dispose().await + } +} diff --git a/rust/noosphere-storage/src/storage.rs b/rust/noosphere-storage/src/storage.rs index 86a585596..cdd7ccaf5 100644 --- a/rust/noosphere-storage/src/storage.rs +++ b/rust/noosphere-storage/src/storage.rs @@ -1,5 +1,4 @@ -use crate::block::BlockStore; -use crate::key_value::KeyValueStore; +use crate::{block::BlockStore, ephemeral::EphemeralStorage, key_value::KeyValueStore}; use anyhow::Result; use async_trait::async_trait; use noosphere_common::ConditionalSync; @@ -13,7 +12,7 @@ use std::fmt::Debug; /// other Noosphere constructs. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait Storage: Clone + ConditionalSync + Debug { +pub trait Storage: EphemeralStorage + Clone + ConditionalSync + Debug { type BlockStore: BlockStore; type KeyValueStore: KeyValueStore; diff --git a/rust/noosphere-storage/src/store.rs b/rust/noosphere-storage/src/store.rs index 2cef23a5c..51c607101 100644 --- a/rust/noosphere-storage/src/store.rs +++ b/rust/noosphere-storage/src/store.rs @@ -1,9 +1,10 @@ -use std::io::Cursor; +use std::{io::Cursor, pin::Pin}; use crate::{block::BlockStore, key_value::KeyValueStore}; use anyhow::Result; use async_trait::async_trait; use cid::Cid; +use futures::Stream; use libipld_cbor::DagCborCodec; use libipld_core::{ codec::{Codec, Decode}, @@ -37,6 +38,21 @@ pub trait Store: Clone + ConditionalSync { } } +/// An async stream of key/value pairs from an [IterableStore]. +#[cfg(not(target_arch = "wasm32"))] +pub type IterableStoreStream<'a> = dyn Stream, Vec)>> + Send + 'a; +/// An async stream of key/value pairs from an [IterableStore]. +#[cfg(target_arch = "wasm32")] +pub type IterableStoreStream<'a> = dyn Stream, Vec)>> + 'a; + +/// A store that can iterate over all of its entries. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait IterableStore { + /// Retrieve all key/value pairs from this store as an async stream. + fn get_all_entries(&self) -> Pin>>; +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for S @@ -104,3 +120,37 @@ where Store::flush(self).await } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{NonPersistentStorage, PreferredPlatformStorage, Storage, LINK_STORE}; + use std::collections::HashMap; + use tokio_stream::StreamExt; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + pub async fn iterable_stores_get_all_entries() -> Result<()> { + let storage = NonPersistentStorage::::new().await?; + let mut store = storage.get_key_value_store(LINK_STORE).await?; + store.write(&[1], &[11]).await?; + store.write(&[2], &[22]).await?; + store.write(&[3], &[33]).await?; + let mut stream = store.get_all_entries(); + + let mut results = HashMap::new(); + while let Some((key, value)) = stream.try_next().await? { + results.insert(key, value); + } + assert_eq!(results.len(), 3); + assert_eq!(results.get(&vec![1]), Some(&vec![11u8])); + assert_eq!(results.get(&vec![2]), Some(&vec![22u8])); + assert_eq!(results.get(&vec![3]), Some(&vec![33u8])); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/tap.rs b/rust/noosphere-storage/src/tap.rs index b93684bfa..12d181d42 100644 --- a/rust/noosphere-storage/src/tap.rs +++ b/rust/noosphere-storage/src/tap.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use cid::Cid; use tokio::sync::mpsc::{channel, Receiver, Sender}; -/// Wraps any [BlockStore] and "taps" it by cloning any block successfully +/// Wraps any [BlockStoreView] and "taps" it by cloning any block successfully /// retrieved from the store and sending over an MPSC channel. This allows an /// observer to record all the blocks needed to load arbitrarily deep and /// complex DAGs into memory without orchestrating a dedicated callback for the diff --git a/rust/noosphere-storage/src/ucan.rs b/rust/noosphere-storage/src/ucan.rs index 0b238f57f..25b11e792 100644 --- a/rust/noosphere-storage/src/ucan.rs +++ b/rust/noosphere-storage/src/ucan.rs @@ -27,7 +27,7 @@ impl UcanStoreTrait for UcanStore { } } -impl Clone for UcanStore { +impl Clone for UcanStore { fn clone(&self) -> Self { UcanStore(self.0.clone()) } diff --git a/rust/noosphere/src/sphere/builder/mod.rs b/rust/noosphere/src/sphere/builder/mod.rs index 86d513df3..3c61e337a 100644 --- a/rust/noosphere/src/sphere/builder/mod.rs +++ b/rust/noosphere/src/sphere/builder/mod.rs @@ -285,7 +285,7 @@ pub(crate) async fn generate_db( ipfs_gateway_url.map(|url| GatewayClient::new(url)), ); - SphereDb::new(&storage).await + SphereDb::new(storage).await } #[cfg(test)] diff --git a/rust/noosphere/src/storage.rs b/rust/noosphere/src/storage.rs index e9631eaef..20e80e1c1 100644 --- a/rust/noosphere/src/storage.rs +++ b/rust/noosphere/src/storage.rs @@ -51,13 +51,11 @@ impl StorageLayout { pub(crate) async fn to_storage(&self) -> Result { #[cfg(sled)] { - noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path( - PathBuf::from(self), - )) + noosphere_storage::SledStorage::new(PathBuf::from(self)) } #[cfg(rocksdb)] { - noosphere_storage::RocksDbStorage::new(PathBuf::from(self)) + noosphere_storage::RocksDbStorage::new(PathBuf::from(self)).await } } }