Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions kvdb-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ license = "GPL-3.0"
edition = "2018"

[dependencies]
wasm-bindgen = "0.2.51"
js-sys = "0.3.28"
wasm-bindgen = "0.2.54"
js-sys = "0.3.31"
kvdb = { version = "0.1", path = "../kvdb" }
kvdb-memorydb = { version = "0.1", path = "../kvdb-memorydb" }
futures-preview = "0.3.0-alpha.19"
futures = "0.3"
log = "0.4.8"
send_wrapper = "0.3.0"

[dependencies.web-sys]
version = "0.3.28"
version = "0.3.31"
Comment thread
ordian marked this conversation as resolved.
features = [
'console',
'Window',
Expand All @@ -37,7 +37,6 @@ features = [
]

[dev-dependencies]
wasm-bindgen-test = "0.2.49"
futures-preview = { version = "0.3.0-alpha.19", features = ['compat'] }
futures01 = { package = "futures", version = "0.1" }
wasm-bindgen-test = "0.3.4"
console_log = "0.1.2"
wasm-bindgen-futures = "0.4.4"
107 changes: 34 additions & 73 deletions kvdb-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use kvdb::{DBTransaction, DBValue};
use kvdb_memorydb::{self as in_memory, InMemory};
use send_wrapper::SendWrapper;
use std::io;
use std::rc::Rc;
use std::sync::Mutex;

pub use error::Error;
pub use kvdb::KeyValueDB;
Expand All @@ -44,7 +42,7 @@ pub struct Database {
version: u32,
columns: u32,
in_memory: InMemory,
indexed_db: Mutex<SendWrapper<IdbDatabase>>,
indexed_db: SendWrapper<IdbDatabase>,
}

// The default column is represented as `None`.
Expand All @@ -57,71 +55,38 @@ fn number_to_column(col: u32) -> Column {
impl Database {
/// Opens the database with the given name,
/// and the specified number of columns (not including the default one).
pub fn open(name: String, columns: u32) -> impl Future<Output = Result<Database, error::Error>> {
// let's try to open the latest version of the db first
let open_request = indexed_db::open(name.as_str(), None, columns);
pub async fn open(name: String, columns: u32) -> Result<Database, error::Error> {
let name_clone = name.clone();
open_request
.then(move |db| {
let db = match db {
Ok(db) => db,
Err(err) => return future::Either::Right(future::err(err)),
};

// If we need more column than the latest version has,
// then bump the version (+ 1 for the default column).
// In order to bump the version, we close the database
// and reopen it with a higher version than it was opened with previously.
// cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
if columns + 1 > db.columns {
let next_version = db.version + 1;
drop(db);
future::Either::Left(indexed_db::open(name.as_str(), Some(next_version), columns).boxed())
} else {
future::Either::Left(future::ok(db).boxed())
}
// populate the in_memory db from the IndexedDB
})
.then(move |db| {
let db = match db {
Ok(db) => db,
Err(err) => return future::Either::Right(future::err(err)),
};

let indexed_db::IndexedDB { version, inner, .. } = db;
let rc = Rc::new(inner.take());
let weak = Rc::downgrade(&rc);
// read the columns from the IndexedDB
future::Either::Left(
stream::iter(0..=columns)
.map(move |n| {
let db = weak.upgrade().expect("rc should live at least as long; qed");
indexed_db::idb_cursor(&db, n).fold(DBTransaction::new(), move |mut txn, (key, value)| {
let column = number_to_column(n);
txn.put_vec(column, key.as_ref(), value);
future::ready(txn)
})
// write each column into memory
})
.fold(in_memory::create(columns), |m, txn| {
txn.then(|txn| {
m.write_buffered(txn);
future::ready(m)
})
})
.then(move |in_memory| {
future::ok(Database {
name: name_clone,
version,
columns,
in_memory,
indexed_db: Mutex::new(SendWrapper::new(
Rc::try_unwrap(rc).expect("should have only 1 ref at this point; qed"),
)),
})
}),
)
})
// let's try to open the latest version of the db first
let db = indexed_db::open(name.as_str(), None, columns).await?;

// If we need more column than the latest version has,
// then bump the version (+ 1 for the default column).
// In order to bump the version, we close the database
// and reopen it with a higher version than it was opened with previously.
// cf. https://github.com/paritytech/parity-common/pull/202#discussion_r321221751
let db = if columns + 1 > db.columns {
let next_version = db.version + 1;
drop(db);
indexed_db::open(name.as_str(), Some(next_version), columns).await?
} else {
db
};
// populate the in_memory db from the IndexedDB
let indexed_db::IndexedDB { version, inner, .. } = db;
let in_memory = in_memory::create(columns);
// read the columns from the IndexedDB
for n in 0..=columns {
let column = number_to_column(n);
let mut txn = DBTransaction::new();
let mut stream = indexed_db::idb_cursor(&*inner, n);
while let Some((key, value)) = stream.next().await {
txn.put_vec(column, key.as_ref(), value);
}
// write each column into memory
in_memory.write_buffered(txn);
}
Ok(Database { name: name_clone, version, columns, in_memory, indexed_db: inner })
}

/// Get the database name.
Expand All @@ -137,9 +102,7 @@ impl Database {

impl Drop for Database {
fn drop(&mut self) {
if let Ok(db) = self.indexed_db.lock() {
db.close();
}
self.indexed_db.close();
}
}

Expand All @@ -153,9 +116,7 @@ impl KeyValueDB for Database {
}

fn write_buffered(&self, transaction: DBTransaction) {
if let Ok(guard) = self.indexed_db.lock() {
let _ = indexed_db::idb_commit_transaction(&*guard, &transaction, self.columns);
}
let _ = indexed_db::idb_commit_transaction(&*self.indexed_db, &transaction, self.columns);
self.in_memory.write_buffered(transaction);
}

Expand Down
52 changes: 22 additions & 30 deletions kvdb-web/tests/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,44 @@

//! IndexedDB tests.

use futures::compat;
use futures::future::{self, FutureExt as _, TryFutureExt as _};
use futures::future::TryFutureExt as _;

use kvdb_web::{Database, KeyValueDB as _};

use wasm_bindgen::JsValue;
use wasm_bindgen_test::*;

wasm_bindgen_test_configure!(run_in_browser);

#[wasm_bindgen_test(async)]
fn reopen_the_database_with_more_columns() -> impl futures01::Future<Item = (), Error = JsValue> {
#[wasm_bindgen_test]
async fn reopen_the_database_with_more_columns() {
let _ = console_log::init_with_level(log::Level::Trace);

fn open_db(col: u32) -> impl future::Future<Output = Database> {
Database::open("MyAsyncTest".into(), col).unwrap_or_else(|err| panic!("{}", err))
async fn open_db(col: u32) -> Database {
Database::open("MyAsyncTest".into(), col).unwrap_or_else(|err| panic!("{}", err)).await
}

let fut = open_db(1)
.then(|db| {
// Write a value into the database
let mut batch = db.transaction();
batch.put(None, b"hello", b"world");
db.write_buffered(batch);
let db = open_db(1).await;

assert_eq!(db.get(None, b"hello").unwrap().unwrap().as_ref(), b"world");
// Write a value into the database
let mut batch = db.transaction();
batch.put(None, b"hello", b"world");
db.write_buffered(batch);

// Check the database version
assert_eq!(db.version(), 1);
assert_eq!(db.get(None, b"hello").unwrap().unwrap().as_ref(), b"world");

// Close the database
drop(db);
// Check the database version
assert_eq!(db.version(), 1);

// Reopen it again with 3 columns
open_db(3)
})
.map(|db| {
// The value should still be present
assert_eq!(db.get(None, b"hello").unwrap().unwrap().as_ref(), b"world");
assert!(db.get(None, b"trash").unwrap().is_none());
// Close the database
drop(db);

// The version should be bumped
assert_eq!(db.version(), 2);
// Reopen it again with 3 columns
let db = open_db(3).await;

Ok(())
});
// The value should still be present
assert_eq!(db.get(None, b"hello").unwrap().unwrap().as_ref(), b"world");
assert!(db.get(None, b"trash").unwrap().is_none());

compat::Compat::new(fut)
// The version should be bumped
assert_eq!(db.version(), 2);
}