Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check DHT for gossip peers #86

Merged
merged 4 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod chain_list;
pub mod config;
pub mod controller;
pub mod metrics;
pub mod multi_ticker;
pub mod multiaddr_ext;
pub mod p2p;
pub mod premints;
Expand Down
126 changes: 126 additions & 0 deletions src/multi_ticker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::collections::HashMap;

use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::Stream;
use futures_ticker::Ticker;
use futures_util::stream::{select_all, FusedStream};
use futures_util::StreamExt;

pub struct MultiTicker<T: Copy + Hash + Eq + Unpin + 'static> {
tickers: HashMap<T, Ticker>,
}

impl<T> MultiTicker<T>
where
T: Copy + Hash + Eq + Unpin + Sized + 'static,
{
pub fn new<I>(tickers: I) -> Self
where
I: IntoIterator<Item = (T, Ticker)>,
{
MultiTicker {
tickers: tickers.into_iter().collect::<HashMap<_, _>>(),
}
}
}

struct KeyedStream<T, S>(T, S);

impl<T, S> Stream for KeyedStream<T, S>
where
T: Copy + Unpin,
S: Stream + Unpin,
{
type Item = (T, S::Item);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let KeyedStream(key, stream) = self.get_mut();
match stream.poll_next_unpin(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some((*key, item))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(1, None)
}
}

impl<T> FusedStream for MultiTicker<T>
where
T: Copy + Hash + Eq + Unpin + Sized + 'static,
{
fn is_terminated(&self) -> bool {
self.tickers.is_empty()
}
}

impl<T: Copy + Hash + Eq + Unpin + 'static> MultiTicker<T> {}

impl<T: Copy + Hash + Eq + Unpin + 'static> Stream for MultiTicker<T> {
type Item = (T, Instant);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
select_all(
self.get_mut()
.tickers
.iter_mut()
.map(|(k, v)| KeyedStream(*k, v)),
)
.poll_next_unpin(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.tickers.len(), None)
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use futures::executor::block_on;

use super::*;

#[test]
fn test_multi_ticker() {
#[derive(Clone, Copy, Eq, Debug, Hash, PartialEq)]
enum TickerId {
A,
B,
C,
}

let mut tickers = HashMap::new();
tickers.insert(TickerId::A, Ticker::new(Duration::from_millis(1000)));
tickers.insert(TickerId::B, Ticker::new(Duration::from_millis(2100)));
tickers.insert(TickerId::C, Ticker::new(Duration::from_millis(3200)));

let mut multi_ticker = MultiTicker { tickers };

let mut ticks = vec![];
let mut multi_ticker = Pin::new(&mut multi_ticker);
for _ in 0..5 {
ticks.push(block_on(multi_ticker.next()).unwrap());
}

println!("{:?}", ticks);

assert_eq!(
ticks,
vec![
(TickerId::A, ticks[0].1), // 1000
(TickerId::A, ticks[1].1), // 2000
(TickerId::B, ticks[2].1), // 2100
(TickerId::A, ticks[3].1), // 3000
(TickerId::C, ticks[4].1), // 3200
]
);
}
}
122 changes: 87 additions & 35 deletions src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::Hasher;
use std::time::Duration;

Expand All @@ -7,18 +8,17 @@ use futures_ticker::Ticker;
use itertools::Itertools;
use libp2p::autonat::NatStatus;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::Version;
use libp2p::gossipsub::{IdentTopic, TopicHash, Version};
use libp2p::identify::Event;
use libp2p::identity::Keypair;
use libp2p::kad::store::{MemoryStore, RecordStore};
use libp2p::kad::GetProvidersOk::FoundProviders;
use libp2p::kad::{Addresses, ProviderRecord, QueryResult, Record, RecordKey};
use libp2p::multiaddr::{Error, Protocol};
use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel};
use libp2p::kad::{Addresses, ProviderRecord, RecordKey};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport};
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::DialError::DialPeerConditionFalse;
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent, ToSwarm};
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent};
use libp2p::{
autonat, dcutr, gossipsub, kad, noise, relay, request_response, tcp, yamux, Multiaddr, PeerId,
StreamProtocol,
Expand All @@ -30,6 +30,7 @@ use tokio::select;

use crate::config::Config;
use crate::controller::{P2PEvent, SwarmCommand};
use crate::multi_ticker::MultiTicker;
use crate::multiaddr_ext::MultiaddrExt;
use crate::storage::QueryOptions;
use crate::types::{
Expand All @@ -51,12 +52,18 @@ pub struct MintpoolBehaviour {
dcutr: dcutr::Behaviour,
}

#[derive(Copy, Clone, PartialEq, Eq, Hash)]
enum SwarmTickers {
Bootstrap,
DiscoverGossipPeers,
}

pub struct SwarmController {
swarm: libp2p::Swarm<MintpoolBehaviour>,
command_receiver: tokio::sync::mpsc::Receiver<SwarmCommand>,
event_sender: tokio::sync::mpsc::Sender<P2PEvent>,
premint_names: Vec<PremintName>,
bootstrap_ticker: Ticker,
tickers: MultiTicker<SwarmTickers>,
config: Config,
}

Expand Down Expand Up @@ -94,11 +101,17 @@ impl SwarmController {
event_sender,
config: config.clone(),
premint_names: config.premint_names(),
bootstrap_ticker: Ticker::new(
// spec suggests to run bootstrap every 5 minutes
// first time bootstrap will trigger after first connection
Duration::from_secs(60 * 5),
),
tickers: MultiTicker::new(vec![
(
// documentation suggests bootstrapping every 5 minutes
SwarmTickers::Bootstrap,
Ticker::new(Duration::from_secs(60 * 5)),
),
(
SwarmTickers::DiscoverGossipPeers,
Ticker::new(Duration::from_secs(60)),
),
]),
}
}

Expand Down Expand Up @@ -216,20 +229,20 @@ impl SwarmController {
.listen_on(format!("/ip4/{listen_ip}/tcp/{port}").parse()?)?;

let registry_topic = announce_topic();
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&registry_topic)?;

for premint_name in self.premint_names.iter() {
let topic = premint_name.msg_topic();
self.swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
let claim_topic = premint_name.claims_topic();
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&claim_topic)?;
}
self.gossip_subscribe(&registry_topic)?;

// subscribe to all relevant topics
self.premint_names
.iter()
.flat_map(|name| vec![name.msg_topic(), name.claims_topic()])
.collect::<Vec<IdentTopic>>()
.iter()
.for_each(|topic| match self.gossip_subscribe(&topic) {
Ok(_) => {}
Err(err) => {
tracing::error!("Error subscribing to topic: {:?}", err);
}
});

self.run_loop().await;
Ok(())
Expand All @@ -245,14 +258,27 @@ impl SwarmController {
}
}
event = self.swarm.select_next_some() => self.handle_swarm_event(event).await,
_tick = self.bootstrap_ticker.next() => {
match self.swarm.behaviour_mut().kad.bootstrap() {
Ok(_) => {}
Err(err) => {
tracing::error!("Error bootstrapping kad: {:?}", err);
tick = self.tickers.select_next_some() => {
match tick {
(SwarmTickers::Bootstrap, _) => {
match self.swarm.behaviour_mut().kad.bootstrap() {
Ok(_) => {}
Err(err) => {
tracing::error!("Error bootstrapping kad: {:?}", err);
}
}
}
(SwarmTickers::DiscoverGossipPeers, _) => {
let b = self.swarm.behaviour_mut();

b.gossipsub.topics().for_each(|topic| {
// kad will automatically dial all discovered peers,
// gossipsub will automatically sync topics with new peers
b.kad.get_providers(Self::topic_to_record_key(topic));
});
}
}
},
}
}
}
}
Expand Down Expand Up @@ -433,6 +459,7 @@ impl SwarmController {
tracing::debug!("Discovered relay peer: {:?}", info);

for addr in info.listen_addrs {
tracing::debug!("Adding relay address: {:?}", addr);
relay_manager.add_address(peer_id, addr);
}
}
Expand All @@ -453,11 +480,11 @@ impl SwarmController {
}
_ => {}
}
tracing::debug!("Ping event: {:?}", event);
tracing::trace!("Ping event: {:?}", event);
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::Relay(event)) => {
tracing::info!("Relay event: {:?}", event);
tracing::debug!("Relay event: {:?}", event);
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::Autonat(event)) => {
Expand All @@ -470,7 +497,7 @@ impl SwarmController {
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::Dcutr(event)) => {
tracing::info!("Dcutr event: {:?}", event);
tracing::debug!("Dcutr event: {:?}", event);
}

other => {
Expand Down Expand Up @@ -815,6 +842,31 @@ impl SwarmController {
Ok(())
}

fn gossip_subscribe(&mut self, topic: &IdentTopic) -> eyre::Result<()> {
tracing::info!("Subscribing to topic: {}", topic.to_string());
let b = self.swarm.behaviour_mut();

b.gossipsub.subscribe(&topic)?;
b.kad
.start_providing(Self::topic_to_record_key(&topic.hash()))?;

Ok(())
}

fn gossip_unsubscribe(&mut self, topic: &IdentTopic) -> eyre::Result<()> {
let b = self.swarm.behaviour_mut();

b.gossipsub.unsubscribe(topic)?;
b.kad
.stop_providing(&Self::topic_to_record_key(&topic.hash()));

Ok(())
}

fn topic_to_record_key(topic: &TopicHash) -> RecordKey {
RecordKey::new(&format!("topic::{}", topic.to_string()).as_bytes())
}

// Makes a Response for a request to sync from another node
async fn make_sync_response(
&mut self,
Expand Down
Loading