Skip to content
This repository was archived by the owner on Sep 21, 2024. It is now read-only.

Commit a3aa8b5

Browse files
committed
feat: Introduce Importable/Exportable Storage traits.
1 parent 7155f86 commit a3aa8b5

File tree

7 files changed

+218
-36
lines changed

7 files changed

+218
-36
lines changed

rust/noosphere-storage/examples/bench/main.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,7 @@ impl BenchmarkStorage {
128128
))]
129129
let (storage, storage_name) = {
130130
(
131-
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
132-
storage_path.into(),
133-
))?,
131+
noosphere_storage::SledStorage::new(storage_path.into())?,
134132
"SledDbStorage",
135133
)
136134
};
+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use crate::{Storage, Store, SPHERE_DB_STORE_NAMES};
2+
use anyhow::Result;
3+
use async_trait::async_trait;
4+
use noosphere_common::ConditionalSync;
5+
use std::pin::Pin;
6+
use tokio_stream::{Stream, StreamExt};
7+
8+
#[cfg(target_arch = "wasm32")]
9+
pub type IterableStoreStream<'a> =
10+
Pin<Box<dyn Stream<Item = Result<(Vec<u8>, Option<Vec<u8>>)>> + 'a>>;
11+
#[cfg(not(target_arch = "wasm32"))]
12+
pub type IterableStoreStream<'a> =
13+
Pin<Box<dyn Stream<Item = Result<(Vec<u8>, Option<Vec<u8>>)>> + Send + 'a>>;
14+
15+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
16+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
17+
pub trait IterableStore: Store {
18+
fn get_all_entries(&self) -> IterableStoreStream<'_>;
19+
}
20+
21+
/// A blanket implementation for [Storage]s that can be
22+
/// imported via [Importable::import].
23+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
24+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
25+
pub trait Exportable
26+
where
27+
Self: Storage,
28+
<Self as Storage>::KeyValueStore: IterableStore,
29+
{
30+
async fn get_all_store_names(&self) -> Result<Vec<String>> {
31+
let mut names = vec![];
32+
names.extend(SPHERE_DB_STORE_NAMES.iter().map(|name| String::from(*name)));
33+
Ok(names)
34+
}
35+
}
36+
37+
impl<S> Exportable for S
38+
where
39+
S: Storage,
40+
S::KeyValueStore: IterableStore,
41+
{
42+
}
43+
44+
/// A blanket implementation for all [Storage]s to import
45+
/// an [Exportable] storage.
46+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
47+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
48+
pub trait Importable<'a, E>
49+
where
50+
Self: Storage,
51+
Self::KeyValueStore: Store,
52+
E: Exportable + ConditionalSync + 'a,
53+
<E as Storage>::KeyValueStore: IterableStore,
54+
{
55+
async fn import(&'a mut self, exportable: E) -> Result<()> {
56+
for store_name in exportable.get_all_store_names().await? {
57+
let mut store = self.get_key_value_store(&store_name).await?;
58+
let export_store = exportable.get_key_value_store(&store_name).await?;
59+
{
60+
let mut stream = export_store.get_all_entries();
61+
while let Some((key, value)) = stream.try_next().await? {
62+
if let Some(value) = value {
63+
Store::write(&mut store, key.as_ref(), value.as_ref()).await?;
64+
}
65+
}
66+
}
67+
drop(export_store)
68+
}
69+
Ok(())
70+
}
71+
}
72+
73+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
74+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
75+
impl<'a, T, E> Importable<'a, E> for T
76+
where
77+
T: Storage,
78+
T::KeyValueStore: Store,
79+
E: Exportable + ConditionalSync + 'a,
80+
<E as Storage>::KeyValueStore: IterableStore,
81+
{
82+
}
83+
84+
#[cfg(test)]
85+
mod test {
86+
use super::*;
87+
use std::path::PathBuf;
88+
89+
#[cfg(target_arch = "wasm32")]
90+
use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
91+
#[cfg(target_arch = "wasm32")]
92+
wasm_bindgen_test_configure!(run_in_browser);
93+
94+
struct TempPath {
95+
#[cfg(target_arch = "wasm32")]
96+
pub path: String,
97+
#[cfg(not(target_arch = "wasm32"))]
98+
pub path: PathBuf,
99+
#[cfg(not(target_arch = "wasm32"))]
100+
#[allow(unused)]
101+
temp_dir: tempfile::TempDir,
102+
}
103+
104+
impl TempPath {
105+
pub fn new() -> Result<Self> {
106+
#[cfg(target_arch = "wasm32")]
107+
let path: String = witty_phrase_generator::WPGen::new()
108+
.with_words(3)
109+
.unwrap()
110+
.into_iter()
111+
.map(|word| String::from(word))
112+
.collect();
113+
114+
#[cfg(not(target_arch = "wasm32"))]
115+
let temp_dir = tempfile::TempDir::new()?;
116+
#[cfg(not(target_arch = "wasm32"))]
117+
let path = temp_dir.path().to_owned();
118+
119+
#[cfg(target_arch = "wasm32")]
120+
let obj = Self { path };
121+
#[cfg(not(target_arch = "wasm32"))]
122+
let obj = Self { temp_dir, path };
123+
124+
Ok(obj)
125+
}
126+
}
127+
128+
/// wasm32: MemoryStorage -> IndexedDbStorage
129+
/// native: SledStorage -> MemoryStorage
130+
/// native+rocks: SledStorage -> RocksDbStorage
131+
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
132+
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
133+
pub async fn it_can_migrate_to_new_storage_backend() -> Result<()> {
134+
noosphere_core::tracing::initialize_tracing(None);
135+
136+
#[allow(unused)]
137+
let from_temp_path = TempPath::new()?;
138+
#[allow(unused)]
139+
let to_temp_path = TempPath::new()?;
140+
141+
#[cfg(target_arch = "wasm32")]
142+
let from_storage = crate::MemoryStorage::default();
143+
#[cfg(not(target_arch = "wasm32"))]
144+
let from_storage = crate::SledStorage::new(from_temp_path.path.clone())?;
145+
146+
#[cfg(target_arch = "wasm32")]
147+
let mut to_storage = crate::IndexedDbStorage::new(&to_temp_path.path).await?;
148+
#[cfg(all(feature = "rocksdb", not(target_arch = "wasm32")))]
149+
let mut to_storage = crate::RocksDbStorage::new(to_temp_path.path.clone())?;
150+
#[cfg(all(not(feature = "rocksdb"), not(target_arch = "wasm32")))]
151+
let mut to_storage = crate::MemoryStorage::default();
152+
153+
{
154+
let mut store = from_storage.get_key_value_store("links").await?;
155+
for n in 0..10 {
156+
let bytes = vec![n; 10];
157+
store.write(&[n], bytes.as_ref()).await?;
158+
}
159+
}
160+
161+
to_storage.import(from_storage).await?;
162+
163+
{
164+
let store = to_storage.get_key_value_store("links").await?;
165+
for n in 0..10 {
166+
let expected_bytes = vec![n; 10];
167+
168+
if let Some(bytes) = store.read(&[n]).await? {
169+
assert_eq!(bytes, expected_bytes);
170+
} else {
171+
panic!("Expected key `{n}` to exist in new db");
172+
}
173+
}
174+
}
175+
Ok(())
176+
}
177+
}

rust/noosphere-storage/src/helpers.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::Storage;
22
use anyhow::Result;
33

44
#[cfg(not(target_arch = "wasm32"))]
5-
use crate::{SledStorage, SledStorageInit, SledStore};
5+
use crate::{SledStorage, SledStore};
66

77
#[cfg(not(target_arch = "wasm32"))]
88
pub async fn make_disposable_store() -> Result<SledStore> {
@@ -13,7 +13,7 @@ pub async fn make_disposable_store() -> Result<SledStore> {
1313
.into_iter()
1414
.map(String::from)
1515
.collect();
16-
let provider = SledStorage::new(SledStorageInit::Path(temp_dir.join(temp_name)))?;
16+
let provider = SledStorage::new(temp_dir.join(temp_name))?;
1717
provider.get_block_store("foo").await
1818
}
1919

rust/noosphere-storage/src/implementation/memory.rs

+14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::{anyhow, Result};
2+
use async_stream::try_stream;
23
use async_trait::async_trait;
34
use cid::Cid;
45
use std::{collections::HashMap, sync::Arc};
@@ -131,6 +132,19 @@ impl Store for MemoryStore {
131132
}
132133
}
133134

135+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
137+
impl crate::IterableStore for MemoryStore {
138+
fn get_all_entries(&self) -> crate::IterableStoreStream<'_> {
139+
Box::pin(try_stream! {
140+
let dags = self.entries.lock().await;
141+
for key in dags.keys() {
142+
yield (key.to_owned(), dags.get(key).cloned());
143+
}
144+
})
145+
}
146+
}
147+
134148
#[cfg(feature = "performance")]
135149
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136150
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]

rust/noosphere-storage/src/implementation/sled.rs

+18-23
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,22 @@ use crate::storage::Storage;
44
use crate::store::Store;
55

66
use anyhow::Result;
7+
use async_stream::try_stream;
78
use async_trait::async_trait;
89
use sled::{Db, Tree};
910

10-
pub enum SledStorageInit {
11-
Path(PathBuf),
12-
Db(Db),
13-
}
14-
1511
#[derive(Clone, Debug)]
1612
pub struct SledStorage {
1713
db: Db,
1814
#[allow(unused)]
19-
path: Option<PathBuf>,
15+
path: PathBuf,
2016
}
2117

2218
impl SledStorage {
23-
pub fn new(init: SledStorageInit) -> Result<Self> {
24-
let mut db_path = None;
25-
let db: Db = match init {
26-
SledStorageInit::Path(path) => {
27-
std::fs::create_dir_all(&path)?;
28-
db_path = Some(path.clone().canonicalize()?);
29-
sled::open(path)?
30-
}
31-
SledStorageInit::Db(db) => db,
32-
};
19+
pub fn new(path: PathBuf) -> Result<Self> {
20+
std::fs::create_dir_all(&path)?;
21+
let db_path = path.canonicalize()?;
22+
let db = sled::open(&db_path)?;
3323

3424
Ok(SledStorage { db, path: db_path })
3525
}
@@ -104,16 +94,21 @@ impl Drop for SledStorage {
10494
}
10595
}
10696

97+
impl crate::IterableStore for SledStore {
98+
fn get_all_entries(&self) -> crate::IterableStoreStream<'_> {
99+
Box::pin(try_stream! {
100+
for entry in self.db.iter() {
101+
let (key, value) = entry?;
102+
yield (Vec::from(key.as_ref()), Some(Vec::from(value.as_ref())));
103+
}
104+
})
105+
}
106+
}
107+
107108
#[cfg(feature = "performance")]
108109
#[async_trait]
109110
impl crate::Space for SledStorage {
110111
async fn get_space_usage(&self) -> Result<u64> {
111-
if let Some(path) = &self.path {
112-
crate::get_dir_size(path).await
113-
} else {
114-
Err(anyhow::anyhow!(
115-
"Could not calculate storage space, requires usage of a path constructor."
116-
))
117-
}
112+
crate::get_dir_size(&self.path).await
118113
}
119114
}

rust/noosphere-storage/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod key_value;
1212

1313
mod db;
1414
mod encoding;
15+
mod exportable;
1516
mod retry;
1617
mod storage;
1718
mod store;
@@ -22,6 +23,7 @@ pub use crate::ucan::*;
2223
pub use block::*;
2324
pub use db::*;
2425
pub use encoding::*;
26+
pub use exportable::*;
2527
pub use implementation::*;
2628
pub use key_value::*;
2729
pub use retry::*;

rust/noosphere/src/storage.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,11 @@ impl From<StorageLayout> for PathBuf {
4545
impl StorageLayout {
4646
pub async fn to_storage(&self) -> Result<PrimitiveStorage> {
4747
#[cfg(sled)]
48-
{
49-
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
50-
PathBuf::from(self),
51-
))
52-
}
48+
let storage = noosphere_storage::SledStorage::new(PathBuf::from(self));
5349
#[cfg(rocksdb)]
54-
{
55-
noosphere_storage::RocksDbStorage::new(PathBuf::from(self))
56-
}
50+
let storage = noosphere_storage::RocksDbStorage::new(PathBuf::from(self));
51+
52+
storage
5753
}
5854
}
5955

0 commit comments

Comments
 (0)