Skip to content

Commit

Permalink
feat(kad): add limit option for getting providers
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Jun 27, 2022
1 parent 7190952 commit d9b00ab
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 28 deletions.
96 changes: 71 additions & 25 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,17 +920,23 @@ where
///
/// The result of this operation is delivered in a
/// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`].
pub fn get_providers(&mut self, key: record::Key) -> QueryId {
pub fn get_providers(&mut self, key: record::Key, limit: ProviderLimit) -> QueryId {
let providers = self
.store
.providers(&key)
.into_iter()
.filter(|p| !p.is_expired(Instant::now()))
.map(|p| p.provider)
.collect();
.map(|p| p.provider);

let providers = match limit {
ProviderLimit::None => providers.collect(),
ProviderLimit::N(limit) => providers.take(limit.into()).collect(),
};

let info = QueryInfo::GetProviders {
key: key.clone(),
providers,
limit,
};
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
Expand Down Expand Up @@ -1259,17 +1265,19 @@ where
})),
}),

QueryInfo::GetProviders { key, providers } => {
Some(KademliaEvent::OutboundQueryCompleted {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Ok(GetProvidersOk {
key,
providers,
closest_peers: result.peers.collect(),
})),
})
}
QueryInfo::GetProviders {
key,
providers,
limit: _,
} => Some(KademliaEvent::OutboundQueryCompleted {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Ok(GetProvidersOk {
key,
providers,
closest_peers: result.peers.collect(),
})),
}),

QueryInfo::AddProvider {
context,
Expand Down Expand Up @@ -1554,17 +1562,19 @@ where
})),
}),

QueryInfo::GetProviders { key, providers } => {
Some(KademliaEvent::OutboundQueryCompleted {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
key,
providers,
closest_peers: result.peers.collect(),
})),
})
}
QueryInfo::GetProviders {
key,
providers,
limit: _,
} => Some(KademliaEvent::OutboundQueryCompleted {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
key,
providers,
closest_peers: result.peers.collect(),
})),
}),
}
}

Expand Down Expand Up @@ -2332,6 +2342,31 @@ where
{
query.on_success(&peer_id, vec![])
}

if let QueryInfo::GetProviders {
key: _,
providers,
limit,
} = &query.inner.info
{
match limit {
ProviderLimit::None => {
// No limit, so wait for enough peers to respond.
}
ProviderLimit::N(n) => {
// Check if we have enough providers.
if usize::from(*n) <= providers.len() {
debug!(
"found enough providers {}/{}, finishing",
providers.len(),
n
);
query.finish();
}
}
}
}

if self.connected_peers.contains(&peer_id) {
self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler {
Expand Down Expand Up @@ -2364,6 +2399,15 @@ where
}
}

/// Specifies the number of provider records fetched.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ProviderLimit {
/// No limit on the number of records.
None,
/// Finishes the query as soon as this many records have been found.
N(NonZeroUsize),
}

/// A quorum w.r.t. the configured replication factor specifies the minimum
/// number of distinct nodes that must be successfully contacted in order
/// for a query to succeed.
Expand Down Expand Up @@ -2863,6 +2907,8 @@ pub enum QueryInfo {
key: record::Key,
/// The found providers.
providers: HashSet<PeerId>,
/// The limit of how many providers to find,
limit: ProviderLimit,
},

/// A (repeated) query initiated by [`Kademlia::start_providing`].
Expand Down
88 changes: 86 additions & 2 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ fn network_behaviour_inject_address_change() {
}

#[test]
fn get_providers() {
fn get_providers_single() {
fn prop(key: record::Key) {
let (_, mut single_swarm) = build_node();
single_swarm
Expand All @@ -1352,7 +1352,9 @@ fn get_providers() {
}
});

let query_id = single_swarm.behaviour_mut().get_providers(key.clone());
let query_id = single_swarm
.behaviour_mut()
.get_providers(key.clone(), ProviderLimit::None);

block_on(async {
match single_swarm.next().await.unwrap() {
Expand All @@ -1379,3 +1381,85 @@ fn get_providers() {
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}

fn get_providers_limit<const N: usize>() {
fn prop<const N: usize>(key: record::Key) {
let mut swarms = build_nodes(3);

// Let first peer know of second peer and second peer know of third peer.
for i in 0..2 {
let (peer_id, address) = (
Swarm::local_peer_id(&swarms[i + 1].1).clone(),
swarms[i + 1].0.clone(),
);
swarms[i].1.behaviour_mut().add_address(&peer_id, address);
}

// Drop the swarm addresses.
let mut swarms = swarms
.into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();

// Provide the content on peer 2 and 3.
for i in 1..3 {
swarms[i]
.behaviour_mut()
.start_providing(key.clone())
.expect("could not provide");
}

// Query with expecting a single provider.
let query_id = swarms[0]
.behaviour_mut()
.get_providers(key.clone(), ProviderLimit::N(N.try_into().unwrap()));

block_on(poll_fn(move |ctx| {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted {
id,
result:
QueryResult::GetProviders(Ok(GetProvidersOk {
key: found_key,
providers,
..
})),
..
},
))) if i == 0 && id == query_id => {
// There are a total of 2 providers.
assert_eq!(providers.len(), std::cmp::min(N, 2));
assert_eq!(key, found_key);
// Providers should be either 2 or 3
assert_ne!(swarm.local_peer_id(), providers.iter().next().unwrap());
return Poll::Ready(());
}
Poll::Ready(..) => {}
Poll::Pending => break,
}
}
}
Poll::Pending
}));
}

QuickCheck::new().tests(10).quickcheck(prop::<N> as fn(_))
}

#[test]
fn get_providers_limit_n_1() {
get_providers_limit::<1>();
}

#[test]
fn get_providers_limit_n_2() {
get_providers_limit::<1>();
}

#[test]
fn get_providers_limit_n_5() {
get_providers_limit::<5>();
}
2 changes: 1 addition & 1 deletion protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use behaviour::{
};
pub use behaviour::{
Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent,
KademliaStoreInserts, Quorum,
KademliaStoreInserts, ProviderLimit, Quorum,
};
pub use protocol::KadConnectionType;
pub use query::QueryId;
Expand Down

0 comments on commit d9b00ab

Please sign in to comment.