Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

- Expose Distance private field U256 to public.
See [PR 5705](https://github.com/libp2p/rust-libp2p/pull/5705).
- Fix systematic memory allocation when iterating over `KBuckets`.
See [PR 5715](https://github.com/libp2p/rust-libp2p/pull/5715).

## 0.47.0

Expand Down
2 changes: 2 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ impl Config {
/// Sets the configuration for the k-buckets.
///
/// * Default to K_VALUE.
///
/// **WARNING**: setting a `size` higher that `K_VALUE` may imply additional memory allocations.
pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
self.kbucket_config.set_bucket_size(size);
self
Expand Down
109 changes: 72 additions & 37 deletions protocols/kad/src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use std::{collections::VecDeque, num::NonZeroUsize, time::Duration};
use bucket::KBucket;
pub use bucket::NodeStatus;
pub use entry::*;
use smallvec::SmallVec;
use web_time::Instant;

/// Maximum number of k-buckets.
Expand Down Expand Up @@ -282,11 +283,8 @@ where
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: move |b: &KBucket<TKey, _>| -> Vec<_> {
let mut vec = Vec::with_capacity(bucket_size);
vec.extend(b.iter().map(|(n, _)| n.key.clone()));
vec
},
fmap: |(n, _status): (&Node<TKey, TVal>, NodeStatus)| n.key.clone(),
bucket_size,
}
}

Expand All @@ -307,15 +305,11 @@ where
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: move |b: &KBucket<_, TVal>| -> Vec<_> {
b.iter()
.take(bucket_size)
.map(|(n, status)| EntryView {
node: n.clone(),
status,
})
.collect()
fmap: |(n, status): (&Node<TKey, TVal>, NodeStatus)| EntryView {
node: n.clone(),
status,
},
bucket_size,
}
}

Expand Down Expand Up @@ -358,10 +352,12 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
/// distance of the local key to the target.
buckets_iter: ClosestBucketsIter,
/// The iterator over the entries in the currently traversed bucket.
iter: Option<std::vec::IntoIter<TOut>>,
iter: Option<ClosestIterBuffer<TOut>>,
/// The projection function / mapping applied on each bucket as
/// it is encountered, producing the next `iter`ator.
fmap: TMap,
/// The maximal number of nodes that a bucket can contain.
bucket_size: usize,
}

/// An iterator over the bucket indices, in the order determined by the `Distance` of
Expand Down Expand Up @@ -463,41 +459,80 @@ where
TTarget: AsRef<KeyBytes>,
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
TMap: Fn(&KBucket<TKey, TVal>) -> Vec<TOut>,
TMap: Fn((&Node<TKey, TVal>, NodeStatus)) -> TOut,
TOut: AsRef<KeyBytes>,
{
type Item = TOut;

fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.iter {
Some(iter) => match iter.next() {
Some(k) => return Some(k),
None => self.iter = None,
},
None => {
if let Some(i) = self.buckets_iter.next() {
let bucket = &mut self.table.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.table.applied_pending.push_back(applied)
}
let mut v = (self.fmap)(bucket);
v.sort_by(|a, b| {
self.target
.as_ref()
.distance(a.as_ref())
.cmp(&self.target.as_ref().distance(b.as_ref()))
});
self.iter = Some(v.into_iter());
} else {
return None;
}
let (mut buffer, bucket_index) = if let Some(mut iter) = self.iter.take() {
if let Some(next) = iter.next() {
self.iter = Some(iter);
return Some(next);
}

let bucket_index = self.buckets_iter.next()?;

// Reusing the same buffer so if there were any allocation, it only happen once over
// a `ClosestIter` life.
iter.buffer.clear();

(iter.buffer, bucket_index)
} else {
let bucket_index = self.buckets_iter.next()?;

// Allocation only occurs if `kbucket_size` is greater than `K_VALUE`.
(SmallVec::with_capacity(self.bucket_size), bucket_index)
};

let bucket = &mut self.table.buckets[bucket_index.get()];
if let Some(applied) = bucket.apply_pending() {
self.table.applied_pending.push_back(applied)
}

buffer.extend(
bucket
.iter()
.take(self.bucket_size)
.map(|e| (self.fmap)(e))
.map(Some),
);
buffer.sort_by(|a, b| {
let a = a.as_ref().expect("just initialized");
let b = b.as_ref().expect("just initialized");
self.target
.as_ref()
.distance(a.as_ref())
.cmp(&self.target.as_ref().distance(b.as_ref()))
});

self.iter = Some(ClosestIterBuffer::new(buffer));
}
}
}

struct ClosestIterBuffer<TOut> {
buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>,
index: usize,
}

impl<TOut> ClosestIterBuffer<TOut> {
fn new(buffer: SmallVec<[Option<TOut>; K_VALUE.get()]>) -> Self {
Self { buffer, index: 0 }
}
}

impl<TOut> Iterator for ClosestIterBuffer<TOut> {
type Item = TOut;

fn next(&mut self) -> Option<Self::Item> {
let entry = self.buffer.get_mut(self.index)?;
self.index += 1;
entry.take()
}
}

/// A reference to a bucket.
pub struct KBucketRef<'a, TKey, TVal> {
index: BucketIndex,
Expand Down
Loading