Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
futures = "0.3.27"
futures-timer = "3.0.3"
indexmap = { version = "2.9.0", features = ["std"] }
ip_network = "0.4"
libc = "0.2.158"
mockall = "0.13.1"
multiaddr = "0.17.0"
Expand Down
171 changes: 148 additions & 23 deletions src/transport/manager/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::{error::DialError, PeerId};

use ip_network::IpNetwork;
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;

Expand All @@ -38,8 +39,14 @@ pub mod scores {

/// Score for providing an invalid address.
///
/// This address can never be reached.
/// This address can never be reached and is effectively banned.
pub const ADDRESS_FAILURE: i32 = i32::MIN;

/// Initial score for public/global addresses.
///
/// This gives public addresses a slight priority over private addresses
/// when all addresses are untested (private addresses start at 0).
pub const PUBLIC_ADDRESS_BONUS: i32 = 1i32;
}

#[allow(clippy::derived_hash_with_manual_eq)]
Expand Down Expand Up @@ -70,7 +77,7 @@ impl AddressRecord {
address
};

Self { address, score }
Self::from_raw_multiaddr_with_score(address, score)
}

/// Create `AddressRecord` from `Multiaddr`.
Expand All @@ -82,10 +89,7 @@ impl AddressRecord {
return None;
}

Some(AddressRecord {
address,
score: 0i32,
})
Some(Self::from_raw_multiaddr_with_score(address, 0))
}

/// Create `AddressRecord` from `Multiaddr`.
Expand All @@ -94,10 +98,7 @@ impl AddressRecord {
///
/// Please consider using [`Self::from_multiaddr`] from the transport manager code.
pub fn from_raw_multiaddr(address: Multiaddr) -> AddressRecord {
AddressRecord {
address,
score: 0i32,
}
Self::from_raw_multiaddr_with_score(address, 0)
}

/// Create `AddressRecord` from `Multiaddr`.
Expand All @@ -106,7 +107,7 @@ impl AddressRecord {
///
/// Please consider using [`Self::from_multiaddr`] from the transport manager code.
pub fn from_raw_multiaddr_with_score(address: Multiaddr, score: i32) -> AddressRecord {
AddressRecord { address, score }
Self { address, score }
}

/// Get address score.
Expand All @@ -122,8 +123,29 @@ impl AddressRecord {

/// Update score of an address.
pub fn update_score(&mut self, score: i32) {
self.score = self.score.saturating_add(score);
self.score = score;
}
}

/// Check if a multiaddr represents a global/public address.
///
/// DNS addresses are considered potentially public.
fn is_global_multiaddr(address: &Multiaddr) -> bool {
for protocol in address.iter() {
match protocol {
Protocol::Ip4(ip) => return IpNetwork::from(ip).is_global(),
Protocol::Ip6(ip) => return IpNetwork::from(ip).is_global(),
// DNS addresses could resolve to public IPs, treat as potentially public.
// Ideally we need to resolve DNS to check the actual IPs. However, this
// is a more complex operation that requires async DNS resolution in the
// transport manager context / transport layer.
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => return true,
_ => continue,
}
}

// Consider the address as non-global if no IP or DNS component is found
false
}

impl PartialEq for AddressRecord {
Expand Down Expand Up @@ -219,14 +241,31 @@ impl AddressStore {

/// Insert the address record into [`AddressStore`] with the provided score.
///
/// If the address is not in the store, it will be inserted.
/// Otherwise, the score and connection ID will be updated.
/// If the address is not in the store, it will be inserted with a bonus for public addresses.
/// Otherwise, the score will be updated only for connection events (non-zero scores),
/// not for re-adding the same address which should not overwrite connection history.
pub fn insert(&mut self, record: AddressRecord) {
if let Entry::Occupied(mut occupied) = self.addresses.entry(record.address.clone()) {
occupied.get_mut().update_score(record.score);
// Only update score for connection events (non-zero scores).
// Re-adding an address (score 0) via rediscovery should not wipe out
// connection success/failure history.
if record.score != 0 {
occupied.get_mut().update_score(record.score);
}
return;
}

// Reward public addresses with a bonus.
let is_public = is_global_multiaddr(&record.address);
let record = if is_public {
AddressRecord {
score: record.score.saturating_add(scores::PUBLIC_ADDRESS_BONUS),
..record
}
} else {
record
};

// The eviction algorithm favours addresses with higher scores.
//
// This algorithm has the following implications:
Expand Down Expand Up @@ -275,10 +314,10 @@ mod tests {
let peer = PeerId::random();
let address = std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(
rng.gen_range(1..=255),
rng.gen_range(0..=255),
10,
rng.gen_range(0..=255),
rng.gen_range(0..=255),
rng.gen_range(1..=255),
),
rng.gen_range(1..=65535),
));
Expand All @@ -297,10 +336,10 @@ mod tests {
let peer = PeerId::random();
let address = std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(
rng.gen_range(1..=255),
rng.gen_range(0..=255),
10,
rng.gen_range(0..=255),
rng.gen_range(0..=255),
rng.gen_range(1..=255),
),
rng.gen_range(1..=65535),
));
Expand All @@ -320,10 +359,10 @@ mod tests {
let peer = PeerId::random();
let address = std::net::SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(
rng.gen_range(1..=255),
rng.gen_range(0..=255),
10,
rng.gen_range(0..=255),
rng.gen_range(0..=255),
rng.gen_range(1..=255),
),
rng.gen_range(1..=65535),
));
Expand Down Expand Up @@ -479,12 +518,98 @@ mod tests {
assert_eq!(store.addresses.len(), 1);
assert_eq!(store.addresses.get(record.address()).unwrap(), &record);

// This time the record is updated.
// This time the record score is replaced (not accumulated).
store.insert(record.clone());

assert_eq!(store.addresses.len(), 1);
let store_record = store.addresses.get(record.address()).unwrap();
assert_eq!(store_record.score, record.score * 2);
assert_eq!(store_record.score, record.score);
}

#[test]
fn insert_record_does_not_accumulate_public_bonus() {
let mut store = AddressStore::new();
let peer = PeerId::random();

// Create a public address (8.8.8.8 is global) using from_multiaddr.
// The bonus is NOT applied at construction time, only when first inserted.
let address = Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::new(8, 8, 8, 8)))
.with(Protocol::Tcp(9999))
.with(Protocol::P2p(
multihash::Multihash::from_bytes(&peer.to_bytes()).unwrap(),
));

let record = AddressRecord::from_multiaddr(address.clone()).unwrap();
assert_eq!(record.score, 0);

store.insert(record.clone());
assert_eq!(store.addresses.len(), 1);
// Bonus applied on first insert.
assert_eq!(
store.addresses.get(&address).unwrap().score,
scores::PUBLIC_ADDRESS_BONUS
);

// Re-adding the same address should NOT accumulate the bonus.
let record2 = AddressRecord::from_multiaddr(address.clone()).unwrap();
store.insert(record2);

assert_eq!(store.addresses.len(), 1);
// Score should still be 1, not 2.
assert_eq!(
store.addresses.get(&address).unwrap().score,
scores::PUBLIC_ADDRESS_BONUS
);

// However, connection events should still update (replace) the score.
let connection_record =
AddressRecord::new(&peer, address.clone(), scores::CONNECTION_ESTABLISHED);
store.insert(connection_record);

assert_eq!(store.addresses.len(), 1);
// Score should now be CONNECTION_ESTABLISHED (bonus only applied on first insert).
assert_eq!(
store.addresses.get(&address).unwrap().score,
scores::CONNECTION_ESTABLISHED
);
}

#[test]
fn rediscovery_does_not_wipe_dial_failure() {
let mut store = AddressStore::new();
let peer = PeerId::random();

// Public address (8.8.8.8 is global).
let address = Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::new(8, 8, 8, 8)))
.with(Protocol::Tcp(9999))
.with(Protocol::P2p(
multihash::Multihash::from_bytes(&peer.to_bytes()).unwrap(),
));

// First, add the address normally.
let record = AddressRecord::from_multiaddr(address.clone()).unwrap();
store.insert(record);
assert_eq!(
store.addresses.get(&address).unwrap().score,
scores::PUBLIC_ADDRESS_BONUS
);

// Dial failure occurs (bonus only applied on first insert, not on updates).
let failure_record = AddressRecord::new(&peer, address.clone(), scores::CONNECTION_FAILURE);
store.insert(failure_record);
let failure_score = scores::CONNECTION_FAILURE;
assert_eq!(store.addresses.get(&address).unwrap().score, failure_score);

// Address is rediscovered via Kademlia (creates record with score 0).
// This should NOT wipe out the dial failure score.
let rediscovered = AddressRecord::from_multiaddr(address.clone()).unwrap();
assert_eq!(rediscovered.score, 0);
store.insert(rediscovered);

// Score should still reflect the failure, not 0.
assert_eq!(store.addresses.get(&address).unwrap().score, failure_score);
}

#[test]
Expand Down
Loading