Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Rc::try_unwrap for (possibly?) better performance #85

Merged
merged 6 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions wnfs-bench/hamt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use criterion::{
async_executor::AsyncStdExecutor, black_box, criterion_group, criterion_main, BatchSize,
Criterion, Throughput,
};
use proptest::{arbitrary::any, test_runner::TestRunner};
use proptest::{arbitrary::any, collection::vec, test_runner::TestRunner};
use std::{rc::Rc, sync::Arc};
use wnfs::{
dagcbor,
Expand All @@ -19,7 +19,7 @@ fn node_set(c: &mut Criterion) {
let mut store = MemoryBlockStore::default();
let operations = operations(any::<[u8; 32]>(), any::<u64>(), 1_000_000).sample(&mut runner);
let node =
&*async_std::task::block_on(async { node_from_operations(operations, &mut store).await })
&async_std::task::block_on(async { node_from_operations(operations, &mut store).await })
.expect("Couldn't setup HAMT node from operations");

let store = Arc::new(store);
Expand All @@ -32,7 +32,39 @@ fn node_set(c: &mut Criterion) {
(store, kv)
},
|(store, (key, value))| async move {
black_box(node.set(key, value, store.as_ref()).await.unwrap());
black_box(
Rc::clone(node)
.set(key, value, store.as_ref())
.await
.unwrap(),
);
},
BatchSize::SmallInput,
);
});
}

fn node_set_consecutive(c: &mut Criterion) {
let mut runner = TestRunner::deterministic();

c.bench_function("node set 1000 consecutive", |b| {
b.to_async(AsyncStdExecutor).iter_batched(
|| {
let mut store = MemoryBlockStore::default();
let operations =
operations(any::<[u8; 32]>(), any::<u64>(), 1000).sample(&mut runner);
let node = async_std::task::block_on(async {
node_from_operations(operations, &mut store).await
})
.expect("Couldn't setup HAMT node from operations");

let kvs = vec((any::<[u8; 32]>(), any::<u64>()), 1000).sample(&mut runner);
(node, store, kvs)
},
|(mut node, store, kvs)| async move {
for (key, value) in kvs {
node = black_box(node.set(key, value, &store).await.unwrap());
}
},
BatchSize::SmallInput,
);
Expand Down Expand Up @@ -159,6 +191,7 @@ fn hamt_set_encode(c: &mut Criterion) {
criterion_group!(
benches,
node_set,
node_set_consecutive,
node_load_get,
node_load_remove,
hamt_load_decode,
Expand Down
7 changes: 4 additions & 3 deletions wnfs/src/private/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ impl PrivateForest {
value: Cid,
store: &mut B,
) -> Result<Rc<Self>> {
let mut cloned = (*self).clone();
// TODO(matheus23): This iterates the path in the HAMT twice.
// We could consider implementing something like upsert instead.
let mut values = self
Expand All @@ -247,8 +246,10 @@ impl PrivateForest {
.cloned()
.unwrap_or_default();
values.insert(value);
cloned.root = self.root.set(name, values, store).await?;
Ok(Rc::new(cloned))

let mut hamt = Rc::try_unwrap(self).unwrap_or_else(|rc| (*rc).clone());
hamt.root = hamt.root.set(name, values, store).await?;
Ok(Rc::new(hamt))
}

/// Gets the encrypted value at the given key.
Expand Down
218 changes: 112 additions & 106 deletions wnfs/src/private/hamt/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_recursion::async_recursion;
use async_trait::async_trait;
use bitvec::array::BitArray;

use futures::future::LocalBoxFuture;
use libipld::{serde as ipld_serde, Ipld};
use log::debug;
use serde::{
Expand Down Expand Up @@ -51,6 +52,10 @@ where
hasher: PhantomData<H>,
}

/// A type alias for the return value from `Node::remove_value`
// TODO(matheus23) this is not the best way to handle this.
pub type RemovedResult<K, V, H> = (Rc<Node<K, V, H>>, Option<Pair<K, V>>);

//--------------------------------------------------------------------------------------------------
// Implementations
//--------------------------------------------------------------------------------------------------
Expand All @@ -76,7 +81,7 @@ where
/// assert_eq!(node.get(&String::from("key"), store).await.unwrap(), Some(&42));
/// }
/// ```
pub async fn set<B: BlockStore>(&self, key: K, value: V, store: &B) -> Result<Rc<Self>>
pub async fn set<B: BlockStore>(self: Rc<Self>, key: K, value: V, store: &B) -> Result<Rc<Self>>
where
K: DeserializeOwned + Clone + AsRef<[u8]>,
V: DeserializeOwned + Clone,
Expand Down Expand Up @@ -261,50 +266,47 @@ where
(mask & self.bitmask).count_ones()
}

#[async_recursion(?Send)]
async fn set_value<B: BlockStore>(
&self,
hashnibbles: &mut HashNibbles,
fn set_value<'a, B: BlockStore>(
self: Rc<Self>,
hashnibbles: &'a mut HashNibbles,
key: K,
value: V,
store: &B,
) -> Result<Rc<Self>>
store: &'a B,
) -> LocalBoxFuture<'a, Result<Rc<Self>>>
where
K: DeserializeOwned + Clone + AsRef<[u8]>,
V: DeserializeOwned + Clone,
K: DeserializeOwned + Clone + AsRef<[u8]> + 'a,
V: DeserializeOwned + Clone + 'a,
H: 'a,
{
let bit_index = hashnibbles.try_next()?;
let value_index = self.get_value_index(bit_index);
Box::pin(async move {
let bit_index = hashnibbles.try_next()?;
let value_index = self.get_value_index(bit_index);

debug!(
"set_value: bit_index = {}, value_index = {}",
bit_index, value_index
);
debug!(
"set_value: bit_index = {}, value_index = {}",
bit_index, value_index
);

// If the bit is not set yet, insert a new pointer.
if !self.bitmask[bit_index] {
let mut node = self.clone();
let mut node = Rc::try_unwrap(self).unwrap_or_else(|rc| (*rc).clone());

node.pointers
.insert(value_index, Pointer::Values(vec![Pair { key, value }]));
// If the bit is not set yet, insert a new pointer.
if !node.bitmask[bit_index] {
node.pointers
.insert(value_index, Pointer::Values(vec![Pair { key, value }]));

node.bitmask.set(bit_index, true);
node.bitmask.set(bit_index, true);

return Ok(Rc::new(node));
}
return Ok(Rc::new(node));
}

Ok(match &self.pointers[value_index] {
Pointer::Values(values) => {
let mut node = self.clone();
let pointers: Pointer<_, _, H> = {
let mut values = (*values).clone();
match &mut node.pointers[value_index] {
Pointer::Values(values) => {
if let Some(i) = values
.iter()
.position(|p| &H::hash(&p.key) == hashnibbles.digest)
{
// If the key is already present, update the value.
values[i] = Pair::new(key, value);
Pointer::Values(values)
} else {
// Otherwise, insert the new value if bucket is not full. Create new node if it is.
if values.len() < HAMT_VALUES_BUCKET_SIZE {
Expand All @@ -314,11 +316,13 @@ where
.position(|p| &H::hash(&p.key) > hashnibbles.digest)
.unwrap_or(values.len());
values.insert(index, Pair::new(key, value));
Pointer::Values(values)
} else {
// If values has reached threshold, we need to create a node link that splits it.
let mut sub_node = Rc::new(Node::<K, V, H>::default());
let cursor = hashnibbles.get_cursor();
// We can take because
// Pointer::Values() gets replaced with Pointer::Link at the end
let values = std::mem::take(values);
for Pair { key, value } in
values.into_iter().chain(Some(Pair::new(key, value)))
{
Expand All @@ -327,21 +331,18 @@ where
sub_node =
sub_node.set_value(hashnibbles, key, value, store).await?;
}
Pointer::Link(Link::from(sub_node))
node.pointers[value_index] = Pointer::Link(Link::from(sub_node));
}
}
};

node.pointers[value_index] = pointers;
Rc::new(node)
}
Pointer::Link(link) => {
let child = Rc::clone(link.resolve_value(store).await?);
let child = child.set_value(hashnibbles, key, value, store).await?;
let mut node = self.clone();
node.pointers[value_index] = Pointer::Link(Link::from(child));
Rc::new(node)
}
Pointer::Link(link) => {
let child = Rc::clone(link.resolve_value(store).await?);
let child = child.set_value(hashnibbles, key, value, store).await?;
node.pointers[value_index] = Pointer::Link(Link::from(child));
}
}

Ok(Rc::new(node))
})
}

Expand Down Expand Up @@ -376,76 +377,81 @@ where
}
}

#[async_recursion(?Send)]
async fn remove_value<'a, 'b, B: BlockStore>(
self: &'a Rc<Self>,
hashnibbles: &'b mut HashNibbles,
store: &B,
) -> Result<(Rc<Self>, Option<Pair<K, V>>)>
fn remove_value<'a, B: BlockStore>(
self: Rc<Self>,
hashnibbles: &'a mut HashNibbles,
store: &'a B,
) -> LocalBoxFuture<'a, Result<RemovedResult<K, V, H>>>
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
where
K: DeserializeOwned + Clone + AsRef<[u8]>,
V: DeserializeOwned + Clone,
K: DeserializeOwned + Clone + AsRef<[u8]> + 'a,
V: DeserializeOwned + Clone + 'a,
H: 'a,
{
let bit_index = hashnibbles.try_next()?;
Box::pin(async move {
let bit_index = hashnibbles.try_next()?;

// If the bit is not set yet, return None.
if !self.bitmask[bit_index] {
return Ok((Rc::clone(self), None));
}

let value_index = self.get_value_index(bit_index);
Ok(match &self.pointers[value_index] {
Pointer::Values(values) => {
let mut node = (**self).clone();
let value = if values.len() == 1 {
// If the key doesn't match, return without removing.
if &H::hash(&values[0].key) != hashnibbles.digest {
return Ok((Rc::clone(self), None));
}
// If there is only one value, we can remove the entire pointer.
node.bitmask.set(bit_index, false);
match node.pointers.remove(value_index) {
Pointer::Values(mut values) => Some(values.pop().unwrap()),
_ => unreachable!(),
}
} else {
// Otherwise, remove just the value.
let mut values = (*values).clone();
values
.iter()
.position(|p| &H::hash(&p.key) == hashnibbles.digest)
.map(|i| {
let value = values.remove(i);
node.pointers[value_index] = Pointer::Values(values);
value
})
};

(Rc::new(node), value)
// If the bit is not set yet, return None.
if !self.bitmask[bit_index] {
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
return Ok((self, None));
}
Pointer::Link(link) => {
let child = Rc::clone(link.resolve_value(store).await?);
let (child, value) = child.remove_value(hashnibbles, store).await?;

let mut node = (**self).clone();
if value.is_some() {
// If something has been deleted, we attempt toc canonicalize the pointer.
if let Some(pointer) =
Pointer::Link(Link::from(child)).canonicalize(store).await?
{
node.pointers[value_index] = pointer;

let value_index = self.get_value_index(bit_index);

let mut node = Rc::try_unwrap(self).unwrap_or_else(|rc| (*rc).clone());

let removed = match &mut node.pointers[value_index] {
Pointer::Values(values) => {
if values.len() == 1 {
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
// If the key doesn't match, return without removing.
if &H::hash(&values[0].key) != hashnibbles.digest {
None
} else {
// If there is only one value, we can remove the entire pointer.
node.bitmask.set(bit_index, false);
match node.pointers.remove(value_index) {
Pointer::Values(mut values) => Some(values.pop().unwrap()),
_ => unreachable!(),
}
}
} else {
// This is None if the pointer now points to an empty node.
// In that case, we remove it from the parent.
node.bitmask.set(bit_index, false);
node.pointers.remove(value_index);
// Otherwise, remove just the value.
match values
.iter()
.position(|p| &H::hash(&p.key) == hashnibbles.digest)
{
Some(i) => {
let value = values.remove(i);
// We can take here because we replace the node.pointers here afterwards anyway
let values = std::mem::take(values);
node.pointers[value_index] = Pointer::Values(values);
Some(value)
}
None => None,
}
}
} else {
node.pointers[value_index] = Pointer::Link(Link::from(child))
};

(Rc::new(node), value)
}
}
Pointer::Link(link) => {
let child = Rc::clone(link.resolve_value(store).await?);
let (child, removed) = child.remove_value(hashnibbles, store).await?;
if removed.is_some() {
// If something has been deleted, we attempt toc canonicalize the pointer.
if let Some(pointer) =
Pointer::Link(Link::from(child)).canonicalize(store).await?
{
node.pointers[value_index] = pointer;
} else {
// This is None if the pointer now points to an empty node.
// In that case, we remove it from the parent.
node.bitmask.set(bit_index, false);
node.pointers.remove(value_index);
}
} else {
node.pointers[value_index] = Pointer::Link(Link::from(child))
};
removed
}
};
Ok((Rc::new(node), removed))
})
}
}
Expand Down