Skip to content

Commit

Permalink
More insight into Kademlia queries. (#1567)
Browse files Browse the repository at this point in the history
* [libp2p-kad] Provide more insight and control into Kademlia queries.

More insight: The API allows iterating over the active queries and
inspecting their state and execution statistics.

More control: The API allows aborting queries prematurely
at any time.

To that end, API operations that initiate new queries return the query ID
and multi-phase queries such as `put_record` retain the query ID across all
phases, each phase being executed by a new (internal) query.

* Cleanup

* Cleanup

* Update examples and re-exports.

* Incorporate review feedback.

* Update CHANGELOG

* Update CHANGELOG

Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
romanb and mxinden authored May 16, 2020
1 parent c271f6f commit 3a96ebf
Show file tree
Hide file tree
Showing 9 changed files with 921 additions and 362 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
has no effect.
[PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536)

- `libp2p-kad`: Provide more insight into, and control of, the execution of
queries. All query results are now wrapped in `KademliaEvent::QueryResult`.
As a side-effect of these changes and for as long as the record storage
API is not asynchronous, local storage errors on `put_record` are reported
synchronously in a `Result`, instead of being reported asynchronously by
an event.
[PR 1567](https://github.com/libp2p/rust-libp2p/pull/1567)

- `libp2p-tcp`: On listeners started with an IPv6 multi-address the socket
option `IPV6_V6ONLY` is set to true. Instead of relying on IPv4-mapped IPv6
address support, two listeners can be started if IPv4 and IPv6 should both
Expand Down
49 changes: 30 additions & 19 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@
use async_std::{io, task};
use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{record::Key, Kademlia, KademliaEvent, PutRecordOk, Quorum, Record};
use libp2p::kad::{
record::Key,
Kademlia,
KademliaEvent,
PutRecordOk,
QueryResult,
Quorum,
Record
};
use libp2p::{
NetworkBehaviour,
PeerId,
Expand Down Expand Up @@ -76,26 +84,29 @@ fn main() -> Result<(), Box<dyn Error>> {
// Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) {
match message {
KademliaEvent::GetRecordResult(Ok(result)) => {
for Record { key, value, .. } in result.records {
KademliaEvent::QueryResult { result, .. } => match result {
QueryResult::GetRecord(Ok(ok)) => {
for Record { key, value, .. } in ok.records {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
KademliaEvent::GetRecordResult(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
KademliaEvent::PutRecordResult(Ok(PutRecordOk { key })) => {
println!(
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
KademliaEvent::PutRecordResult(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
_ => {}
}
_ => {}
}
Expand Down Expand Up @@ -188,7 +199,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
publisher: None,
expires: None,
};
kademlia.put_record(record, Quorum::One);
kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
}
_ => {
eprintln!("expected GET or PUT");
Expand Down
13 changes: 11 additions & 2 deletions examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ use libp2p::{
identity,
build_development_transport
};
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError};
use libp2p::kad::{
Kademlia,
KademliaConfig,
KademliaEvent,
GetClosestPeersError,
QueryResult,
};
use libp2p::kad::record::store::MemoryStore;
use std::{env, error::Error, time::Duration};

Expand Down Expand Up @@ -91,7 +97,10 @@ fn main() -> Result<(), Box<dyn Error>> {
task::block_on(async move {
loop {
let event = swarm.next().await;
if let KademliaEvent::GetClosestPeersResult(result) = event {
if let KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(result),
..
} = event {
match result {
Ok(ok) =>
if !ok.peers.is_empty() {
Expand Down
Loading

0 comments on commit 3a96ebf

Please sign in to comment.