Skip to content
This repository was archived by the owner on Sep 21, 2024. It is now read-only.

Commit 8af27db

Browse files
committed
feat: Introduce storage migrations via Importable/Exportable Storage traits, and other trait-based operations.
1 parent 7155f86 commit 8af27db

File tree

12 files changed

+554
-89
lines changed

12 files changed

+554
-89
lines changed

rust/noosphere-storage/Cargo.toml

+1-3
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ noosphere-core = { version = "0.15.2", path = "../noosphere-core", features = ["
4242
# examples/bench
4343
noosphere-common = { version = "0.1.0", path = "../noosphere-common", features = ["helpers"] }
4444

45-
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
46-
tempfile = { workspace = true }
47-
4845
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
4946
sled = "~0.34"
47+
tempfile = { workspace = true }
5048
tokio = { workspace = true, features = ["full"] }
5149
rocksdb = { version = "0.21.0", optional = true }
5250

rust/noosphere-storage/examples/bench/main.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -128,17 +128,15 @@ impl BenchmarkStorage {
128128
))]
129129
let (storage, storage_name) = {
130130
(
131-
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
132-
storage_path.into(),
133-
))?,
131+
noosphere_storage::SledStorage::new(&storage_path)?,
134132
"SledDbStorage",
135133
)
136134
};
137135

138136
#[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))]
139137
let (storage, storage_name) = {
140138
(
141-
noosphere_storage::RocksDbStorage::new(storage_path.into())?,
139+
noosphere_storage::RocksDbStorage::new(&storage_path)?,
142140
"RocksDbStorage",
143141
)
144142
};

rust/noosphere-storage/src/extra.rs

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use crate::storage::Storage;
2+
use anyhow::Result;
3+
use async_trait::async_trait;
4+
use noosphere_common::ConditionalSend;
5+
use std::{fs, path::Path};
6+
7+
/// [Storage] that can be opened via [Path] reference.
8+
/// [FsBackedStorage] types get a blanket implementation.
9+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
10+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
11+
pub trait StorageOpen: Storage + Sized {
12+
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self>;
13+
}
14+
15+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
16+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
17+
impl<T> StorageOpen for T
18+
where
19+
T: FsBackedStorage,
20+
{
21+
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self> {
22+
FsBackedStorage::open(path).await
23+
}
24+
}
25+
26+
/// [Storage] that can be deleted via [Path] reference.
27+
/// [FsBackedStorage] types get a blanket implementation.
28+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
29+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
30+
pub trait StorageRemove: Storage + Sized {
31+
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()>;
32+
}
33+
34+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
35+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
36+
impl<T> StorageRemove for T
37+
where
38+
T: FsBackedStorage,
39+
{
40+
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
41+
<T as FsBackedStorage>::remove(path).await
42+
}
43+
}
44+
45+
/// [Storage] that can be moved/renamed via [Path] reference.
46+
/// [FsBackedStorage] types get a blanket implementation.
47+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
48+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
49+
pub trait StorageRename: Storage + Sized {
50+
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
51+
from: P,
52+
to: Q,
53+
) -> Result<()>;
54+
}
55+
56+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
57+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
58+
impl<T> StorageRename for T
59+
where
60+
T: FsBackedStorage,
61+
{
62+
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
63+
from: P,
64+
to: Q,
65+
) -> Result<()> {
66+
<T as FsBackedStorage>::rename(from, to).await
67+
}
68+
}
69+
70+
/// [Storage] that is based on a file system. Implementing [FsBackedStorage]
71+
/// provides blanket implementations for other trait-based [Storage] operations.
72+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
73+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
74+
pub trait FsBackedStorage: Storage + Sized {
75+
/// Opens the storage at `path`.
76+
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self>;
77+
78+
/// Deletes the storage located at `path` directory. Returns `Ok(())` if
79+
/// the directory is successfully removed, or if it already does not exist.
80+
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
81+
match fs::metadata(path.as_ref()) {
82+
Ok(_) => fs::remove_dir_all(path.as_ref()).map_err(|e| e.into()),
83+
Err(_) => Ok(()),
84+
}
85+
}
86+
87+
/// Moves the storage located at `from` to the `to` location.
88+
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
89+
from: P,
90+
to: Q,
91+
) -> Result<()> {
92+
fs::rename(from, to).map_err(|e| e.into())
93+
}
94+
}

rust/noosphere-storage/src/helpers.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::Storage;
22
use anyhow::Result;
33

44
#[cfg(not(target_arch = "wasm32"))]
5-
use crate::{SledStorage, SledStorageInit, SledStore};
5+
use crate::{SledStorage, SledStore};
66

77
#[cfg(not(target_arch = "wasm32"))]
88
pub async fn make_disposable_store() -> Result<SledStore> {
@@ -13,7 +13,7 @@ pub async fn make_disposable_store() -> Result<SledStore> {
1313
.into_iter()
1414
.map(String::from)
1515
.collect();
16-
let provider = SledStorage::new(SledStorageInit::Path(temp_dir.join(temp_name)))?;
16+
let provider = SledStorage::new(temp_dir.join(temp_name))?;
1717
provider.get_block_store("foo").await
1818
}
1919

rust/noosphere-storage/src/implementation/indexed_db.rs

+95-45
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use crate::store::Store;
22
use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage};
33
use anyhow::{anyhow, Error, Result};
4+
use async_stream::try_stream;
45
use async_trait::async_trait;
5-
use js_sys::Uint8Array;
6+
use js_sys::{ArrayBuffer, Uint8Array};
67
use rexie::{
78
KeyRange, ObjectStore, Rexie, RexieBuilder, Store as IdbStore, Transaction, TransactionMode,
89
};
@@ -69,6 +70,11 @@ impl IndexedDbStorage {
6970
let db = Rc::into_inner(self.db)
7071
.ok_or_else(|| anyhow!("Could not unwrap inner during database clear."))?;
7172
db.close();
73+
Self::delete(&name)
74+
}
75+
76+
/// Deletes database with key `db_name` from origin storage.
77+
pub async fn delete(db_name: &str) -> Result<()> {
7278
Rexie::delete(&name)
7379
.await
7480
.map_err(|error| anyhow!("{:?}", error))
@@ -90,7 +96,21 @@ impl Storage for IndexedDbStorage {
9096
}
9197
}
9298

93-
#[derive(Clone)]
99+
#[async_trait(?Send)]
100+
impl crate::extra::StorageOpen for IndexedDbStorage {
101+
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self> {
102+
IndexedDbStorage::new(&key.as_ref().to_string()).await
103+
}
104+
}
105+
106+
#[async_trait(?Send)]
107+
impl crate::extra::StorageRemove for IndexedDbStorage {
108+
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self> {
109+
Self::delete(path).await
110+
}
111+
}
112+
113+
#[derive(Clone, Clone)]
94114
pub struct IndexedDbStore {
95115
db: Rc<Rexie>,
96116
store_name: String,
@@ -114,87 +134,104 @@ impl IndexedDbStore {
114134
Ok(())
115135
}
116136

117-
fn bytes_to_typed_array(bytes: &[u8]) -> Result<JsValue> {
118-
let array = Uint8Array::new_with_length(bytes.len() as u32);
119-
array.copy_from(&bytes);
120-
Ok(JsValue::from(array))
121-
}
122-
123-
async fn contains(key: &JsValue, store: &IdbStore) -> Result<bool> {
137+
async fn contains(key: &[u8], store: &IdbStore) -> Result<bool> {
138+
let key_js = bytes_to_typed_array(key)?;
124139
let count = store
125140
.count(Some(
126-
&KeyRange::only(key).map_err(|error| anyhow!("{:?}", error))?,
141+
&KeyRange::only(&key_js).map_err(|error| anyhow!("{:?}", error))?,
127142
))
128143
.await
129144
.map_err(|error| anyhow!("{:?}", error))?;
130145
Ok(count > 0)
131146
}
132147

133-
async fn read(key: &JsValue, store: &IdbStore) -> Result<Option<Vec<u8>>> {
148+
async fn read(key: &[u8], store: &IdbStore) -> Result<Option<Vec<u8>>> {
149+
let key_js = bytes_to_typed_array(key)?;
134150
Ok(match IndexedDbStore::contains(&key, &store).await? {
135-
true => Some(
151+
true => Some(typed_array_to_bytes(
136152
store
137-
.get(&key)
153+
.get(&key_js)
138154
.await
139-
.map_err(|error| anyhow!("{:?}", error))?
140-
.dyn_into::<Uint8Array>()
141-
.map_err(|error| anyhow!("{:?}", error))?
142-
.to_vec(),
143-
),
155+
.map_err(|error| anyhow!("{:?}", error))?,
156+
)?),
144157
false => None,
145158
})
146159
}
160+
161+
async fn put(key: &[u8], value: &[u8], store: &IdbStore) -> Result<()> {
162+
let key_js = bytes_to_typed_array(key)?;
163+
let value_js = bytes_to_typed_array(value)?;
164+
store
165+
.put(&value_js, Some(&key_js))
166+
.await
167+
.map_err(|error| anyhow!("{:?}", error))?;
168+
Ok(())
169+
}
170+
171+
async fn delete(key: &[u8], store: &IdbStore) -> Result<()> {
172+
let key_js = bytes_to_typed_array(key)?;
173+
store
174+
.delete(&key_js)
175+
.await
176+
.map_err(|error| anyhow!("{:?}", error))?;
177+
Ok(())
178+
}
147179
}
148180

149181
#[async_trait(?Send)]
150182
impl Store for IndexedDbStore {
151183
async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
152184
let (store, tx) = self.start_transaction(TransactionMode::ReadOnly)?;
153-
let key = IndexedDbStore::bytes_to_typed_array(key)?;
154-
155-
let maybe_dag = IndexedDbStore::read(&key, &store).await?;
156-
185+
let maybe_dag = IndexedDbStore::read(key, &store).await?;
157186
IndexedDbStore::finish_transaction(tx).await?;
158-
159187
Ok(maybe_dag)
160188
}
161189

162190
async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
163191
let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?;
164-
165-
let key = IndexedDbStore::bytes_to_typed_array(key)?;
166-
let value = IndexedDbStore::bytes_to_typed_array(bytes)?;
167-
168192
let old_bytes = IndexedDbStore::read(&key, &store).await?;
169-
170-
store
171-
.put(&value, Some(&key))
172-
.await
173-
.map_err(|error| anyhow!("{:?}", error))?;
174-
193+
IndexedDbStore::put(key, bytes, &store).await?;
175194
IndexedDbStore::finish_transaction(tx).await?;
176-
177195
Ok(old_bytes)
178196
}
179197

180198
async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
181199
let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?;
182-
183-
let key = IndexedDbStore::bytes_to_typed_array(key)?;
184-
185-
let old_value = IndexedDbStore::read(&key, &store).await?;
186-
187-
store
188-
.delete(&key)
189-
.await
190-
.map_err(|error| anyhow!("{:?}", error))?;
191-
200+
let old_value = IndexedDbStore::read(key, &store).await?;
201+
IndexedDbStore::delete(key, &store).await?;
192202
IndexedDbStore::finish_transaction(tx).await?;
193-
194203
Ok(old_value)
195204
}
196205
}
197206

207+
impl crate::IterableStore for IndexedDbStore {
208+
fn get_all_entries(&self) -> crate::IterableStoreStream<'_> {
209+
Box::pin(try_stream! {
210+
let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?;
211+
let limit = 100;
212+
let mut offset = 0;
213+
loop {
214+
let results = store.get_all(None, Some(limit), Some(offset), None).await
215+
.map_err(|error| anyhow!("{:?}", error))?;
216+
let count = results.len();
217+
if count == 0 {
218+
IndexedDbStore::finish_transaction(tx).await?;
219+
break;
220+
}
221+
222+
offset += count as u32;
223+
224+
for (key_js, value_js) in results {
225+
yield (
226+
typed_array_to_bytes(JsValue::from(Uint8Array::new(&key_js)))?,
227+
Some(typed_array_to_bytes(value_js)?)
228+
);
229+
}
230+
}
231+
})
232+
}
233+
}
234+
198235
#[cfg(feature = "performance")]
199236
struct SpaceUsageError(Error);
200237

@@ -263,3 +300,16 @@ impl crate::Space for IndexedDbStorage {
263300
}
264301
}
265302
}
303+
304+
fn bytes_to_typed_array(bytes: &[u8]) -> Result<JsValue> {
305+
let array = Uint8Array::new_with_length(bytes.len() as u32);
306+
array.copy_from(&bytes);
307+
Ok(JsValue::from(array))
308+
}
309+
310+
fn typed_array_to_bytes(js_value: JsValue) -> Result<Vec<u8>> {
311+
Ok(js_value
312+
.dyn_into::<Uint8Array>()
313+
.map_err(|error| anyhow!("{:?}", error))?
314+
.to_vec())
315+
}

rust/noosphere-storage/src/implementation/memory.rs

+14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::{anyhow, Result};
2+
use async_stream::try_stream;
23
use async_trait::async_trait;
34
use cid::Cid;
45
use std::{collections::HashMap, sync::Arc};
@@ -131,6 +132,19 @@ impl Store for MemoryStore {
131132
}
132133
}
133134

135+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
137+
impl crate::IterableStore for MemoryStore {
138+
fn get_all_entries(&self) -> crate::IterableStoreStream<'_> {
139+
Box::pin(try_stream! {
140+
let dags = self.entries.lock().await;
141+
for key in dags.keys() {
142+
yield (key.to_owned(), dags.get(key).cloned());
143+
}
144+
})
145+
}
146+
}
147+
134148
#[cfg(feature = "performance")]
135149
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
136150
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]

0 commit comments

Comments
 (0)