Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Introduce storage migrations via ImportStorage/ExportStorage tr…
Browse files Browse the repository at this point in the history
…aits, and other trait-based operations.
  • Loading branch information
jsantell committed Sep 19, 2023
1 parent 7155f86 commit e12d9d5
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 130 deletions.
6 changes: 2 additions & 4 deletions rust/noosphere-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,19 @@ libipld-cbor = { workspace = true }
serde = { workspace = true }
base64 = "=0.21.2"
url = { version = "^2" }
witty-phrase-generator = "~0.2"

[dev-dependencies]
witty-phrase-generator = "~0.2"
wasm-bindgen-test = { workspace = true }
rand = { workspace = true }
# examples/bench
noosphere-core = { version = "0.15.2", path = "../noosphere-core", features = ["helpers"] }
# examples/bench
noosphere-common = { version = "0.1.0", path = "../noosphere-common", features = ["helpers"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tempfile = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
sled = "~0.34"
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }
rocksdb = { version = "0.21.0", optional = true }

Expand Down
6 changes: 2 additions & 4 deletions rust/noosphere-storage/examples/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,15 @@ impl BenchmarkStorage {
))]
let (storage, storage_name) = {
(
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
storage_path.into(),
))?,
noosphere_storage::SledStorage::new(&storage_path)?,
"SledDbStorage",
)
};

#[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))]
let (storage, storage_name) = {
(
noosphere_storage::RocksDbStorage::new(storage_path.into())?,
noosphere_storage::RocksDbStorage::new(&storage_path)?,
"RocksDbStorage",
)
};
Expand Down
94 changes: 94 additions & 0 deletions rust/noosphere-storage/src/extra.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::storage::Storage;
use anyhow::Result;
use async_trait::async_trait;
use noosphere_common::ConditionalSend;
use std::{fs, path::Path};

/// [Storage] that can be opened via [Path] reference.
/// [FsBackedStorage] types get a blanket implementation.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait OpenStorage: Storage + Sized {
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self>;
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T> OpenStorage for T
where
T: FsBackedStorage,
{
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self> {
FsBackedStorage::open(path).await
}
}

/// [Storage] that can be deleted via [Path] reference.
/// [FsBackedStorage] types get a blanket implementation.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait RemoveStorage: Storage + Sized {
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()>;
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T> RemoveStorage for T
where
T: FsBackedStorage,
{
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
<T as FsBackedStorage>::remove(path).await
}
}

/// [Storage] that can be moved/renamed via [Path] reference.
/// [FsBackedStorage] types get a blanket implementation.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait RenameStorage: Storage + Sized {
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()>;
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T> RenameStorage for T
where
T: FsBackedStorage,
{
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()> {
<T as FsBackedStorage>::rename(from, to).await
}
}

/// [Storage] that is based on a file system. Implementing [FsBackedStorage]
/// provides blanket implementations for other trait-based [Storage] operations.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait FsBackedStorage: Storage + Sized {
/// Opens the storage at `path`.
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self>;

/// Deletes the storage located at `path` directory. Returns `Ok(())` if
/// the directory is successfully removed, or if it already does not exist.
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
match fs::metadata(path.as_ref()) {
Ok(_) => fs::remove_dir_all(path.as_ref()).map_err(|e| e.into()),
Err(_) => Ok(()),
}
}

/// Moves the storage located at `from` to the `to` location.
async fn rename<P: AsRef<Path> + ConditionalSend, Q: AsRef<Path> + ConditionalSend>(
from: P,
to: Q,
) -> Result<()> {
fs::rename(from, to).map_err(|e| e.into())
}
}
34 changes: 0 additions & 34 deletions rust/noosphere-storage/src/helpers.rs

This file was deleted.

151 changes: 106 additions & 45 deletions rust/noosphere-storage/src/implementation/indexed_db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::store::Store;
use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage};
use anyhow::{anyhow, Error, Result};
use async_stream::try_stream;
use async_trait::async_trait;
use js_sys::Uint8Array;
use noosphere_common::ConditionalSend;
use rexie::{
KeyRange, ObjectStore, Rexie, RexieBuilder, Store as IdbStore, Transaction, TransactionMode,
};
use std::{fmt::Debug, rc::Rc};
use std::{fmt::Debug, path::Path, rc::Rc};
use wasm_bindgen::{JsCast, JsValue};

pub const INDEXEDDB_STORAGE_VERSION: u32 = 1;
Expand Down Expand Up @@ -69,7 +71,12 @@ impl IndexedDbStorage {
let db = Rc::into_inner(self.db)
.ok_or_else(|| anyhow!("Could not unwrap inner during database clear."))?;
db.close();
Rexie::delete(&name)
Self::delete(&name).await
}

/// Deletes database with key `db_name` from origin storage.
pub async fn delete(db_name: &str) -> Result<()> {
Rexie::delete(db_name)
.await
.map_err(|error| anyhow!("{:?}", error))
}
Expand All @@ -90,6 +97,30 @@ impl Storage for IndexedDbStorage {
}
}

#[async_trait(?Send)]
impl crate::extra::OpenStorage for IndexedDbStorage {
async fn open<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<Self> {
IndexedDbStorage::new(
path.as_ref()
.to_str()
.ok_or_else(|| anyhow!("Could not stringify path."))?,
)
.await
}
}

#[async_trait(?Send)]
impl crate::extra::RemoveStorage for IndexedDbStorage {
async fn remove<P: AsRef<Path> + ConditionalSend>(path: P) -> Result<()> {
Self::delete(
path.as_ref()
.to_str()
.ok_or_else(|| anyhow!("Could not stringify path."))?,
)
.await
}
}

#[derive(Clone)]
pub struct IndexedDbStore {
db: Rc<Rexie>,
Expand All @@ -114,87 +145,104 @@ impl IndexedDbStore {
Ok(())
}

fn bytes_to_typed_array(bytes: &[u8]) -> Result<JsValue> {
let array = Uint8Array::new_with_length(bytes.len() as u32);
array.copy_from(&bytes);
Ok(JsValue::from(array))
}

async fn contains(key: &JsValue, store: &IdbStore) -> Result<bool> {
async fn contains(key: &[u8], store: &IdbStore) -> Result<bool> {
let key_js = bytes_to_typed_array(key)?;
let count = store
.count(Some(
&KeyRange::only(key).map_err(|error| anyhow!("{:?}", error))?,
&KeyRange::only(&key_js).map_err(|error| anyhow!("{:?}", error))?,
))
.await
.map_err(|error| anyhow!("{:?}", error))?;
Ok(count > 0)
}

async fn read(key: &JsValue, store: &IdbStore) -> Result<Option<Vec<u8>>> {
async fn read(key: &[u8], store: &IdbStore) -> Result<Option<Vec<u8>>> {
let key_js = bytes_to_typed_array(key)?;
Ok(match IndexedDbStore::contains(&key, &store).await? {
true => Some(
true => Some(typed_array_to_bytes(
store
.get(&key)
.get(&key_js)
.await
.map_err(|error| anyhow!("{:?}", error))?
.dyn_into::<Uint8Array>()
.map_err(|error| anyhow!("{:?}", error))?
.to_vec(),
),
.map_err(|error| anyhow!("{:?}", error))?,
)?),
false => None,
})
}

async fn put(key: &[u8], value: &[u8], store: &IdbStore) -> Result<()> {
let key_js = bytes_to_typed_array(key)?;
let value_js = bytes_to_typed_array(value)?;
store
.put(&value_js, Some(&key_js))
.await
.map_err(|error| anyhow!("{:?}", error))?;
Ok(())
}

async fn delete(key: &[u8], store: &IdbStore) -> Result<()> {
let key_js = bytes_to_typed_array(key)?;
store
.delete(&key_js)
.await
.map_err(|error| anyhow!("{:?}", error))?;
Ok(())
}
}

#[async_trait(?Send)]
impl Store for IndexedDbStore {
async fn read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let (store, tx) = self.start_transaction(TransactionMode::ReadOnly)?;
let key = IndexedDbStore::bytes_to_typed_array(key)?;

let maybe_dag = IndexedDbStore::read(&key, &store).await?;

let maybe_dag = IndexedDbStore::read(key, &store).await?;
IndexedDbStore::finish_transaction(tx).await?;

Ok(maybe_dag)
}

async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result<Option<Vec<u8>>> {
let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?;

let key = IndexedDbStore::bytes_to_typed_array(key)?;
let value = IndexedDbStore::bytes_to_typed_array(bytes)?;

let old_bytes = IndexedDbStore::read(&key, &store).await?;

store
.put(&value, Some(&key))
.await
.map_err(|error| anyhow!("{:?}", error))?;

IndexedDbStore::put(key, bytes, &store).await?;
IndexedDbStore::finish_transaction(tx).await?;

Ok(old_bytes)
}

async fn remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?;

let key = IndexedDbStore::bytes_to_typed_array(key)?;

let old_value = IndexedDbStore::read(&key, &store).await?;

store
.delete(&key)
.await
.map_err(|error| anyhow!("{:?}", error))?;

let old_value = IndexedDbStore::read(key, &store).await?;
IndexedDbStore::delete(key, &store).await?;
IndexedDbStore::finish_transaction(tx).await?;

Ok(old_value)
}
}

impl crate::IterableStore for IndexedDbStore {
fn get_all_entries(&self) -> crate::IterableStoreStream<'_> {
Box::pin(try_stream! {
let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?;
let limit = 100;
let mut offset = 0;
loop {
let results = store.get_all(None, Some(limit), Some(offset), None).await
.map_err(|error| anyhow!("{:?}", error))?;
let count = results.len();
if count == 0 {
IndexedDbStore::finish_transaction(tx).await?;
break;
}

offset += count as u32;

for (key_js, value_js) in results {
yield (
typed_array_to_bytes(JsValue::from(Uint8Array::new(&key_js)))?,
Some(typed_array_to_bytes(value_js)?)
);
}
}
})
}
}

#[cfg(feature = "performance")]
struct SpaceUsageError(Error);

Expand Down Expand Up @@ -263,3 +311,16 @@ impl crate::Space for IndexedDbStorage {
}
}
}

fn bytes_to_typed_array(bytes: &[u8]) -> Result<JsValue> {
let array = Uint8Array::new_with_length(bytes.len() as u32);
array.copy_from(&bytes);
Ok(JsValue::from(array))
}

fn typed_array_to_bytes(js_value: JsValue) -> Result<Vec<u8>> {
Ok(js_value
.dyn_into::<Uint8Array>()
.map_err(|error| anyhow!("{:?}", error))?
.to_vec())
}
Loading

0 comments on commit e12d9d5

Please sign in to comment.